From fe735bca7078a97653f027a800a63b3cd54fd43e Mon Sep 17 00:00:00 2001 From: Gwo Tzu-Hsing Date: Mon, 25 Nov 2024 22:54:50 +0800 Subject: [PATCH] refactor: add Schema trait --- src/compaction/mod.rs | 10 +- src/inmem/immutable.rs | 112 ++++++--- src/inmem/mutable.rs | 70 ++++-- src/lib.rs | 22 +- src/ondisk/arrows.rs | 11 +- src/ondisk/sstable.rs | 26 +- src/record/internal.rs | 6 +- src/record/mod.rs | 120 +++++++--- src/record/runtime/array.rs | 44 ++-- src/record/runtime/column.rs | 400 ------------------------------- src/record/runtime/mod.rs | 6 +- src/record/runtime/record.rs | 109 +++------ src/record/runtime/record_ref.rs | 20 +- src/record/test.rs | 64 ++--- src/stream/level.rs | 11 +- src/transaction.rs | 18 +- src/version/mod.rs | 22 +- src/version/set.rs | 4 +- src/wal/mod.rs | 6 +- src/wal/record_entry.rs | 15 +- tests/wasm.rs | 2 +- 21 files changed, 405 insertions(+), 693 deletions(-) delete mode 100644 src/record/runtime/column.rs diff --git a/src/compaction/mod.rs b/src/compaction/mod.rs index a358f606..51b41713 100644 --- a/src/compaction/mod.rs +++ b/src/compaction/mod.rs @@ -527,7 +527,7 @@ pub(crate) mod tests { executor::tokio::TokioExecutor, fs::{manager::StoreManager, FileId, FileType}, inmem::{immutable::Immutable, mutable::Mutable}, - record::{Column, ColumnDesc, Datatype, DynRecord, Record, RecordInstance}, + record::{Datatype, DynRecord, Record, RecordInstance, Value, ValueDesc}, scope::Scope, tests::Test, timestamp::Timestamp, @@ -712,7 +712,7 @@ pub(crate) mod tests { .unwrap(); let empty_record = DynRecord::empty_record( - vec![ColumnDesc::new("id".to_owned(), Datatype::Int32, false)], + vec![ValueDesc::new("id".to_owned(), Datatype::Int32, false)], 0, ); let instance = RecordInstance::Runtime(empty_record); @@ -720,7 +720,7 @@ pub(crate) mod tests { let mut batch1_data = vec![]; let mut batch2_data = vec![]; for i in 0..40 { - let col = Column::new(Datatype::Int32, "id".to_owned(), Arc::new(i), false); + let col = Value::new(Datatype::Int32, "id".to_owned(), Arc::new(i), false); if i % 4 == 0 { continue; } @@ -758,11 +758,11 @@ pub(crate) mod tests { .unwrap(); assert_eq!( scope.min, - Column::new(Datatype::Int32, "id".to_owned(), Arc::new(2), false) + Value::new(Datatype::Int32, "id".to_owned(), Arc::new(2), false) ); assert_eq!( scope.max, - Column::new(Datatype::Int32, "id".to_owned(), Arc::new(39), false) + Value::new(Datatype::Int32, "id".to_owned(), Arc::new(39), false) ); } diff --git a/src/inmem/immutable.rs b/src/inmem/immutable.rs index 4077e282..c9af2021 100644 --- a/src/inmem/immutable.rs +++ b/src/inmem/immutable.rs @@ -5,12 +5,12 @@ use std::{ sync::Arc, }; -use arrow::{array::RecordBatch, datatypes::Schema}; +use arrow::{array::RecordBatch, datatypes::Schema as ArrowSchema}; use crossbeam_skiplist::SkipMap; use parquet::arrow::ProjectionMask; use crate::{ - record::{internal::InternalRecordRef, Key, Record, RecordInstance, RecordRef}, + record::{internal::InternalRecordRef, Key, Record, RecordRef, Schema}, stream::record_batch::RecordBatchEntry, timestamp::{Timestamp, Timestamped, TimestampedRef, EPOCH}, }; @@ -20,7 +20,7 @@ pub trait ArrowArrays: Sized + Sync { type Builder: Builder; - fn builder(schema: &Arc, capacity: usize) -> Self::Builder; + fn builder(schema: Arc, capacity: usize) -> Self::Builder; fn get( &self, @@ -37,7 +37,7 @@ where { fn push( &mut self, - key: Timestamped<<::Key as Key>::Ref<'_>>, + key: Timestamped<<<::Schema as Schema>::Key as Key>::Ref<'_>>, row: Option<::Ref<'_>>, ); @@ -51,26 +51,23 @@ where A: ArrowArrays, { data: A, - index: BTreeMap::Key>, u32>, + index: BTreeMap::Schema as Schema>::Key>, u32>, } -impl - From<( - SkipMap::Key>, Option>, - &RecordInstance, - )> for Immutable +impl Immutable where A: ArrowArrays, A::Record: Send, { - fn from( - (mutable, instance): ( - SkipMap::Key>, Option>, - &RecordInstance, - ), + pub(crate) fn new( + mutable: SkipMap< + Timestamped<<::Schema as Schema>::Key>, + Option, + >, + schema: Arc, ) -> Self { let mut index = BTreeMap::new(); - let mut builder = A::builder(&instance.arrow_schema::(), mutable.len()); + let mut builder = A::builder(schema, mutable.len()); for (offset, (key, value)) in mutable.into_iter().enumerate() { builder.push( @@ -93,8 +90,8 @@ where pub(crate) fn scope( &self, ) -> ( - Option<&::Key>, - Option<&::Key>, + Option<&<::Schema as Schema>::Key>, + Option<&<::Schema as Schema>::Key>, ) { ( self.index.first_key_value().map(|(key, _)| key.value()), @@ -109,8 +106,8 @@ where pub(crate) fn scan<'scan>( &'scan self, range: ( - Bound<&'scan ::Key>, - Bound<&'scan ::Key>, + Bound<&'scan <::Schema as Schema>::Key>, + Bound<&'scan <::Schema as Schema>::Key>, ), ts: Timestamp, projection_mask: ProjectionMask, @@ -128,14 +125,16 @@ where let range = self .index - .range::::Key>, _>((lower, upper)); + .range::::Schema as Schema>::Key>, _>(( + lower, upper, + )); ImmutableScan::::new(range, self.data.as_record_batch(), projection_mask) } pub(crate) fn get( &self, - key: &::Key, + key: &<::Schema as Schema>::Key, ts: Timestamp, projection_mask: ProjectionMask, ) -> Option> { @@ -147,9 +146,13 @@ where .next() } - pub(crate) fn check_conflict(&self, key: &::Key, ts: Timestamp) -> bool { + pub(crate) fn check_conflict( + &self, + key: &<::Schema as Schema>::Key, + ts: Timestamp, + ) -> bool { self.index - .range::::Key>, _>(( + .range::::Schema as Schema>::Key>, _>(( Bound::Excluded(TimestampedRef::new(key, u32::MAX.into())), Bound::Excluded(TimestampedRef::new(key, ts)), )) @@ -162,7 +165,7 @@ pub struct ImmutableScan<'iter, R> where R: Record, { - range: Range<'iter, Timestamped, u32>, + range: Range<'iter, Timestamped<::Key>, u32>, record_batch: &'iter RecordBatch, projection_mask: ProjectionMask, } @@ -172,7 +175,7 @@ where R: Record, { fn new( - range: Range<'iter, Timestamped, u32>, + range: Range<'iter, Timestamped<::Key>, u32>, record_batch: &'iter RecordBatch, projection_mask: ProjectionMask, ) -> Self { @@ -221,17 +224,61 @@ pub(crate) mod tests { Array, BooleanArray, BooleanBufferBuilder, BooleanBuilder, PrimitiveBuilder, RecordBatch, StringArray, StringBuilder, UInt32Array, UInt32Builder, }, - datatypes::{ArrowPrimitiveType, Schema, UInt32Type}, + datatypes::{ArrowPrimitiveType, DataType, Field, Schema as ArrowSchema, UInt32Type}, }; - use parquet::arrow::ProjectionMask; + use once_cell::sync::Lazy; + use parquet::{arrow::ProjectionMask, format::SortingColumn, schema::types::ColumnPath}; use super::{ArrowArrays, Builder}; use crate::{ - record::Record, + record::{Record, Schema}, tests::{Test, TestRef}, timestamp::timestamped::Timestamped, }; + pub struct TestSchema; + + impl Schema for TestSchema { + type Record = Test; + + type Columns = TestImmutableArrays; + + type Key = String; + + fn arrow_schema(&self) -> &Arc { + static SCHEMA: Lazy> = Lazy::new(|| { + Arc::new(ArrowSchema::new(vec![ + Field::new("_null", DataType::Boolean, false), + Field::new("_ts", DataType::UInt32, false), + Field::new("vstring", DataType::Utf8, false), + Field::new("vu32", DataType::UInt32, false), + Field::new("vbool", DataType::Boolean, true), + ])) + }); + + &SCHEMA + } + + fn primary_key_index(&self) -> usize { + 2 + } + + fn primary_key_path( + &self, + ) -> ( + parquet::schema::types::ColumnPath, + Vec, + ) { + ( + ColumnPath::new(vec!["_ts".to_string(), "vstring".to_string()]), + vec![ + SortingColumn::new(1, true, true), + SortingColumn::new(2, false, true), + ], + ) + } + } + #[derive(Debug)] pub struct TestImmutableArrays { _null: Arc, @@ -248,7 +295,7 @@ pub(crate) mod tests { type Builder = TestBuilder; - fn builder(_schema: &Arc, capacity: usize) -> Self::Builder { + fn builder(_schema: Arc, capacity: usize) -> Self::Builder { TestBuilder { vstring: StringBuilder::with_capacity(capacity, 0), vu32: PrimitiveBuilder::::with_capacity(capacity), @@ -336,10 +383,9 @@ pub(crate) mod tests { let vbool = Arc::new(self.vobool.finish()); let _null = Arc::new(BooleanArray::new(self._null.finish(), None)); let _ts = Arc::new(self._ts.finish()); + let schema = TestSchema; let mut record_batch = RecordBatch::try_new( - Arc::clone( - <::Record as Record>::arrow_schema(), - ), + Arc::clone(schema.arrow_schema()), vec![ Arc::clone(&_null) as Arc, Arc::clone(&_ts) as Arc, diff --git a/src/inmem/mutable.rs b/src/inmem/mutable.rs index e5ff8d33..49a78177 100644 --- a/src/inmem/mutable.rs +++ b/src/inmem/mutable.rs @@ -10,7 +10,7 @@ use fusio::{buffered::BufWriter, DynFs, DynWrite}; use crate::{ fs::{FileId, FileType}, inmem::immutable::Immutable, - record::{Key, KeyRef, Record, RecordInstance}, + record::{Key, KeyRef, Record, Schema}, timestamp::{ timestamped::{Timestamped, TimestampedRef}, Timestamp, EPOCH, @@ -22,12 +22,12 @@ use crate::{ pub(crate) type MutableScan<'scan, R> = Range< 'scan, - TimestampedRef<::Key>, + TimestampedRef<<::Schema as Schema>::Key>, ( - Bound<&'scan TimestampedRef<::Key>>, - Bound<&'scan TimestampedRef<::Key>>, + Bound<&'scan TimestampedRef<<::Schema as Schema>::Key>>, + Bound<&'scan TimestampedRef<<::Schema as Schema>::Key>>, ), - Timestamped<::Key>, + Timestamped<<::Schema as Schema>::Key>, Option, >; @@ -35,9 +35,11 @@ pub struct Mutable where R: Record, { - pub(crate) data: SkipMap, Option>, + pub(crate) data: SkipMap::Key>, Option>, wal: Option, R>>>, pub(crate) trigger: Arc + Send + Sync>>, + + pub(super) schema: Arc, } impl Mutable @@ -48,6 +50,7 @@ where option: &DbOption, trigger: Arc + Send + Sync>>, fs: &Arc, + schema: Arc, ) -> Result { let mut wal = None; if option.use_wal { @@ -66,6 +69,7 @@ where data: Default::default(), wal, trigger, + schema, }) } } @@ -87,7 +91,7 @@ where pub(crate) async fn remove( &self, log_ty: LogType, - key: R::Key, + key: ::Key, ts: Timestamp, ) -> Result> { self.append(Some(log_ty), key, ts, None).await @@ -96,7 +100,7 @@ where pub(crate) async fn append( &self, log_ty: Option, - key: R::Key, + key: ::Key, ts: Timestamp, value: Option, ) -> Result> { @@ -123,11 +127,11 @@ where pub(crate) fn get( &self, - key: &R::Key, + key: &::Key, ts: Timestamp, - ) -> Option, Option>> { + ) -> Option::Key>, Option>> { self.data - .range::, _>(( + .range::::Key>, _>(( Bound::Included(TimestampedRef::new(key, ts)), Bound::Included(TimestampedRef::new(key, EPOCH)), )) @@ -136,7 +140,10 @@ where pub(crate) fn scan<'scan>( &'scan self, - range: (Bound<&'scan R::Key>, Bound<&'scan R::Key>), + range: ( + Bound<&'scan ::Key>, + Bound<&'scan ::Key>, + ), ts: Timestamp, ) -> MutableScan<'scan, R> { let lower = match range.0 { @@ -157,9 +164,9 @@ where self.data.is_empty() } - pub(crate) fn check_conflict(&self, key: &R::Key, ts: Timestamp) -> bool { + pub(crate) fn check_conflict(&self, key: &::Key, ts: Timestamp) -> bool { self.data - .range::::Key>, _>(( + .range::::Key>, _>(( Bound::Excluded(TimestampedRef::new(key, u32::MAX.into())), Bound::Excluded(TimestampedRef::new(key, ts)), )) @@ -169,8 +176,7 @@ where pub(crate) async fn into_immutable( self, - instance: &RecordInstance, - ) -> Result<(Option, Immutable), fusio::Error> { + ) -> Result<(Option, Immutable<::Columns>), fusio::Error> { let mut file_id = None; if let Some(wal) = self.wal { @@ -179,7 +185,10 @@ where file_id = Some(wal_guard.file_id()); } - Ok((file_id, Immutable::from((self.data, instance)))) + Ok(( + file_id, + Immutable::new(self.data, self.schema.arrow_schema().clone()), + )) } pub(crate) async fn flush_wal(&self) -> Result<(), DbError> { @@ -209,7 +218,8 @@ mod tests { use super::Mutable; use crate::{ - record::{Column, Datatype, DynRecord, Record}, + inmem::immutable::tests::TestSchema, + record::{test::StringSchema, Datatype, DynRecord, DynSchema, Record, Value, ValueDesc}, tests::{Test, TestRef}, timestamp::Timestamped, trigger::TriggerFactory, @@ -228,7 +238,9 @@ mod tests { fs.create_dir_all(&option.wal_dir_path()).await.unwrap(); let trigger = Arc::new(TriggerFactory::create(option.trigger_type)); - let mem_table = Mutable::::new(&option, trigger, &fs).await.unwrap(); + let mem_table = Mutable::::new(&option, trigger, &fs, TestSchema) + .await + .unwrap(); mem_table .insert( @@ -277,7 +289,9 @@ mod tests { let trigger = Arc::new(TriggerFactory::create(option.trigger_type)); - let mutable = Mutable::::new(&option, trigger, &fs).await.unwrap(); + let mutable = Mutable::::new(&option, trigger, &fs, Arc::new(StringSchema)) + .await + .unwrap(); mutable .insert(LogType::Full, "1".into(), 0_u32.into()) @@ -365,7 +379,15 @@ mod tests { let trigger = Arc::new(TriggerFactory::create(option.trigger_type)); - let mutable = Mutable::::new(&option, trigger, &fs) + let schema = Arc::new(DynSchema::new( + vec![ + ValueDesc::new("age".to_string(), Datatype::Int8, false), + ValueDesc::new("height".to_string(), Datatype::Int16, true), + ], + 0, + )); + + let mutable = Mutable::::new(&option, trigger, &fs, schema) .await .unwrap(); @@ -374,8 +396,8 @@ mod tests { LogType::Full, DynRecord::new( vec![ - Column::new(Datatype::Int8, "age".to_string(), Arc::new(1_i8), false), - Column::new( + Value::new(Datatype::Int8, "age".to_string(), Arc::new(1_i8), false), + Value::new( Datatype::Int16, "height".to_string(), Arc::new(1236_i16), @@ -395,7 +417,7 @@ mod tests { assert_eq!( entry.key(), &Timestamped::new( - Column::new(Datatype::Int8, "age".to_string(), Arc::new(1_i8), false), + Value::new(Datatype::Int8, "age".to_string(), Arc::new(1_i8), false), 0_u32.into() ) ); diff --git a/src/lib.rs b/src/lib.rs index 6636de7d..1ac0f054 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -149,7 +149,7 @@ use parquet::{ errors::ParquetError, }; use parquet_lru::{DynLruCache, NoCache}; -use record::{ColumnDesc, DynRecord, Record, RecordInstance}; +use record::{DynRecord, Record, ValueDesc}; use thiserror::Error; use timestamp::{Timestamp, TimestampedRef}; use tokio::sync::oneshot; @@ -181,7 +181,7 @@ where { schema: Arc>>, version_set: VersionSet, - lock_map: LockMap, + lock_map: LockMap<::Key>, manager: Arc, parquet_lru: ParquetLru, _p: PhantomData, @@ -195,7 +195,7 @@ where pub async fn with_schema( option: DbOption, executor: E, - column_descs: Vec, + column_descs: Vec, primary_index: usize, ) -> Result> { let option = Arc::new(option); @@ -912,8 +912,8 @@ pub(crate) mod tests { record::{ internal::InternalRecordRef, runtime::test::{test_dyn_item_schema, test_dyn_items}, - Column, Datatype, DynRecord, RecordDecodeError, RecordEncodeError, RecordInstance, - RecordRef, + Datatype, DynRecord, RecordDecodeError, RecordEncodeError, RecordInstance, RecordRef, + Value, }, serdes::{Decode, Encode}, trigger::{TriggerFactory, TriggerType}, @@ -1841,7 +1841,7 @@ pub(crate) mod tests { let tx = db.transaction().await; for i in 0..50 { - let key = Column::new(Datatype::Int64, "id".to_string(), Arc::new(i as i64), false); + let key = Value::new(Datatype::Int64, "id".to_string(), Arc::new(i as i64), false); let option1 = tx.get(&key, Projection::All).await.unwrap(); if i == 28 { assert!(option1.is_none()); @@ -1935,8 +1935,8 @@ pub(crate) mod tests { // test scan { let tx = db.transaction().await; - let lower = Column::new(Datatype::Int64, "id".to_owned(), Arc::new(0_i64), false); - let upper = Column::new(Datatype::Int64, "id".to_owned(), Arc::new(49_i64), false); + let lower = Value::new(Datatype::Int64, "id".to_owned(), Arc::new(0_i64), false); + let upper = Value::new(Datatype::Int64, "id".to_owned(), Arc::new(49_i64), false); let mut scan = tx .scan((Bound::Included(&lower), Bound::Included(&upper))) .projection(vec![0, 2, 7]) @@ -2090,7 +2090,7 @@ pub(crate) mod tests { let tx3 = db3.transaction().await; for i in 0..50 { - let key = Column::new(Datatype::Int64, "id".to_string(), Arc::new(i as i64), false); + let key = Value::new(Datatype::Int64, "id".to_string(), Arc::new(i as i64), false); let option1 = tx1.get(&key, Projection::All).await.unwrap(); let option2 = tx2.get(&key, Projection::All).await.unwrap(); let option3 = tx3.get(&key, Projection::All).await.unwrap(); @@ -2148,8 +2148,8 @@ pub(crate) mod tests { // test scan { let tx1 = db1.transaction().await; - let lower = Column::new(Datatype::Int64, "id".to_owned(), Arc::new(8_i64), false); - let upper = Column::new(Datatype::Int64, "id".to_owned(), Arc::new(43_i64), false); + let lower = Value::new(Datatype::Int64, "id".to_owned(), Arc::new(8_i64), false); + let upper = Value::new(Datatype::Int64, "id".to_owned(), Arc::new(43_i64), false); let mut scan = tx1 .scan((Bound::Included(&lower), Bound::Included(&upper))) .projection(vec![0, 1]) diff --git a/src/ondisk/arrows.rs b/src/ondisk/arrows.rs index fdca1b10..12875a8b 100644 --- a/src/ondisk/arrows.rs +++ b/src/ondisk/arrows.rs @@ -15,14 +15,14 @@ use parquet::{ }; use crate::{ - record::{Key, Record}, + record::{Key, Record, Schema}, timestamp::Timestamp, }; unsafe fn get_range_bound_fn( - range: Bound<&R::Key>, + range: Bound<&::Key>, ) -> ( - Option<&'static R::Key>, + Option<&'static ::Key>, &'static (dyn Fn(&dyn Datum, &dyn Datum) -> Result + Sync), ) where @@ -54,7 +54,10 @@ where pub(crate) unsafe fn get_range_filter( schema_descriptor: &SchemaDescriptor, - range: (Bound<&R::Key>, Bound<&R::Key>), + range: ( + Bound<&::Key>, + Bound<&::Key>, + ), ts: Timestamp, ) -> RowFilter where diff --git a/src/ondisk/sstable.rs b/src/ondisk/sstable.rs index 60e05af3..32b32f71 100644 --- a/src/ondisk/sstable.rs +++ b/src/ondisk/sstable.rs @@ -16,7 +16,7 @@ use ulid::Ulid; use super::{arrows::get_range_filter, scan::SsTableScan}; use crate::{ - record::Record, + record::{Record, Schema}, stream::record_batch::RecordBatchEntry, timestamp::{Timestamp, TimestampedRef}, }; @@ -70,7 +70,7 @@ where pub(crate) async fn get( self, - key: &TimestampedRef, + key: &TimestampedRef<::Key>, projection_mask: ProjectionMask, ) -> ParquetResult>> { self.scan( @@ -87,7 +87,10 @@ where pub(crate) async fn scan<'scan>( self, - range: (Bound<&'scan R::Key>, Bound<&'scan R::Key>), + range: ( + Bound<&'scan ::Key>, + Bound<&'scan ::Key>, + ), ts: Timestamp, limit: Option, projection_mask: ProjectionMask, @@ -134,7 +137,8 @@ pub(crate) mod tests { use crate::{ executor::tokio::TokioExecutor, fs::{manager::StoreManager, FileType}, - record::Record, + inmem::immutable::tests::TestSchema, + record::{Record, Schema}, tests::{get_test_record_batch, Test}, timestamp::Timestamped, DbOption, @@ -153,7 +157,7 @@ pub(crate) mod tests { ); let mut writer = AsyncArrowWriter::try_new_with_options( AsyncWriter::new(file), - Test::arrow_schema().clone(), + TestSchema {}.arrow_schema().clone(), options, ) .expect("Failed to create writer"); @@ -211,7 +215,7 @@ pub(crate) mod tests { .get( key.borrow(), ProjectionMask::roots( - &arrow_to_parquet_schema(Test::arrow_schema()).unwrap(), + &arrow_to_parquet_schema(TestSchema {}.arrow_schema()).unwrap(), [0, 1, 2, 3], ), ) @@ -228,7 +232,7 @@ pub(crate) mod tests { .get( key.borrow(), ProjectionMask::roots( - &arrow_to_parquet_schema(Test::arrow_schema()).unwrap(), + &arrow_to_parquet_schema(TestSchema {}.arrow_schema()).unwrap(), [0, 1, 2, 4], ), ) @@ -245,7 +249,7 @@ pub(crate) mod tests { .get( key.borrow(), ProjectionMask::roots( - &arrow_to_parquet_schema(Test::arrow_schema()).unwrap(), + &arrow_to_parquet_schema(TestSchema {}.arrow_schema()).unwrap(), [0, 1, 2], ), ) @@ -286,7 +290,7 @@ pub(crate) mod tests { 1_u32.into(), None, ProjectionMask::roots( - &arrow_to_parquet_schema(Test::arrow_schema()).unwrap(), + &arrow_to_parquet_schema(TestSchema {}.arrow_schema()).unwrap(), [0, 1, 2, 3], ), ) @@ -311,7 +315,7 @@ pub(crate) mod tests { 1_u32.into(), None, ProjectionMask::roots( - &arrow_to_parquet_schema(Test::arrow_schema()).unwrap(), + &arrow_to_parquet_schema(TestSchema {}.arrow_schema()).unwrap(), [0, 1, 2, 4], ), ) @@ -336,7 +340,7 @@ pub(crate) mod tests { 1_u32.into(), None, ProjectionMask::roots( - &arrow_to_parquet_schema(Test::arrow_schema()).unwrap(), + &arrow_to_parquet_schema(TestSchema {}.arrow_schema()).unwrap(), [0, 1, 2], ), ) diff --git a/src/record/internal.rs b/src/record/internal.rs index c2192e5a..b2212e32 100644 --- a/src/record/internal.rs +++ b/src/record/internal.rs @@ -1,6 +1,6 @@ use std::{marker::PhantomData, mem::transmute}; -use super::{Key, Record, RecordRef}; +use super::{Key, Record, RecordRef, Schema}; use crate::timestamp::{Timestamp, Timestamped}; #[derive(Debug)] @@ -32,7 +32,9 @@ impl<'r, R> InternalRecordRef<'r, R> where R: RecordRef<'r>, { - pub fn value(&self) -> Timestamped<<::Key as Key>::Ref<'_>> { + pub fn value( + &self, + ) -> Timestamped<<<::Schema as Schema>::Key as Key>::Ref<'_>> { // Safety: shorter lifetime of the value must be safe unsafe { transmute(Timestamped::new(self.record.clone().key(), self.ts)) } } diff --git a/src/record/mod.rs b/src/record/mod.rs index 4712fa5d..127180ee 100644 --- a/src/record/mod.rs +++ b/src/record/mod.rs @@ -2,11 +2,12 @@ pub mod internal; mod key; pub mod runtime; #[cfg(test)] -mod test; +pub(crate) mod test; use std::{error::Error, fmt::Debug, io, sync::Arc}; -use arrow::{array::RecordBatch, datatypes::Schema}; +use array::DynRecordImmutableArrays; +use arrow::{array::RecordBatch, datatypes::Schema as ArrowSchema}; use internal::InternalRecordRef; pub use key::{Key, KeyRef}; use parquet::{arrow::ProjectionMask, format::SortingColumn, schema::types::ColumnPath}; @@ -18,63 +19,112 @@ use crate::{ serdes::{Decode, Encode}, }; -#[allow(unused)] -pub(crate) enum RecordInstance { - Normal, - Runtime(DynRecord), +// #[allow(unused)] +// pub(crate) enum RecordInstance { +// Normal, +// Runtime(DynRecord), +// } + +// #[allow(unused)] +// impl RecordInstance { +// pub(crate) fn primary_key_index(&self) -> usize +// where +// R: Record, +// { +// match self { +// RecordInstance::Normal => R::primary_key_index(), +// RecordInstance::Runtime(record) => record.primary_key_index(), +// } +// } + +// pub(crate) fn arrow_schema(&self) -> Arc +// where +// R: Record, +// { +// match self { +// RecordInstance::Normal => R::arrow_schema().clone(), +// RecordInstance::Runtime(record) => record.arrow_schema(), +// } +// } +// } + +pub trait Schema { + type Record: Record; + + type Columns: ArrowArrays; + + type Key: Key; + + fn arrow_schema(&self) -> &Arc; + + fn primary_key_index(&self) -> usize; + + fn primary_key_path(&self) -> (ColumnPath, Vec); } -#[allow(unused)] -impl RecordInstance { - pub(crate) fn primary_key_index(&self) -> usize - where - R: Record, - { - match self { - RecordInstance::Normal => R::primary_key_index(), - RecordInstance::Runtime(record) => record.primary_key_index(), +#[derive(Debug)] +pub struct DynSchema { + schema: Vec, + primary_index: usize, + arrow_schema: Arc, +} + +impl DynSchema { + pub fn new(schema: Vec, primary_index: usize) -> Self { + let arrow_schema = Arc::new(ArrowSchema::new( + schema + .iter() + .map(|desc| desc.arrow_field()) + .collect::>(), + )); + Self { + schema, + primary_index, + arrow_schema, } } +} - pub(crate) fn arrow_schema(&self) -> Arc - where - R: Record, - { - match self { - RecordInstance::Normal => R::arrow_schema().clone(), - RecordInstance::Runtime(record) => record.arrow_schema(), - } +impl Schema for DynSchema { + type Record = DynRecord; + + type Columns = DynRecordImmutableArrays; + + type Key = Value; + + fn arrow_schema(&self) -> &Arc { + &self.arrow_schema + } + + fn primary_key_index(&self) -> usize { + self.primary_index + } + + fn primary_key_path(&self) -> (ColumnPath, Vec) { + unimplemented!() } } pub trait Record: 'static + Sized + Decode + Debug + Send + Sync { - type Columns: ArrowArrays; - - type Key: Key; + type Schema: Schema; type Ref<'r>: RecordRef<'r, Record = Self> where Self: 'r; - fn key(&self) -> <::Key as Key>::Ref<'_> { + fn key(&self) -> <<::Schema as Schema>::Key as Key>::Ref<'_> { self.as_record_ref().key() } - fn primary_key_index() -> usize; - - fn primary_key_path() -> (ColumnPath, Vec); - fn as_record_ref(&self) -> Self::Ref<'_>; - fn arrow_schema() -> &'static Arc; - fn size(&self) -> usize; } pub trait RecordRef<'r>: Clone + Sized + Encode + Send + Sync { type Record: Record; - fn key(self) -> <::Key as Key>::Ref<'r>; + fn key(self) -> <<::Schema as Schema>::Key as Key>::Ref<'r>; fn projection(&mut self, projection_mask: &ProjectionMask); @@ -82,7 +132,7 @@ pub trait RecordRef<'r>: Clone + Sized + Encode + Send + Sync { record_batch: &'r RecordBatch, offset: usize, projection_mask: &'r ProjectionMask, - full_schema: &'r Arc, + full_schema: &'r Arc, ) -> InternalRecordRef<'r, Self>; } diff --git a/src/record/runtime/array.rs b/src/record/runtime/array.rs index 9cddeeb5..22a31cd5 100644 --- a/src/record/runtime/array.rs +++ b/src/record/runtime/array.rs @@ -7,15 +7,15 @@ use arrow::{ StringArray, StringBuilder, UInt32Builder, }, datatypes::{ - Int16Type, Int32Type, Int64Type, Int8Type, Schema, UInt16Type, UInt32Type, UInt64Type, - UInt8Type, + Int16Type, Int32Type, Int64Type, Int8Type, Schema as ArrowSchema, UInt16Type, UInt32Type, + UInt64Type, UInt8Type, }, }; -use super::{column::Column, record::DynRecord, record_ref::DynRecordRef, Datatype}; +use super::{record::DynRecord, record_ref::DynRecordRef, value::Value, Datatype}; use crate::{ inmem::immutable::{ArrowArrays, Builder}, - record::{Key, Record}, + record::{Key, Record, Schema}, timestamp::Timestamped, }; @@ -23,7 +23,7 @@ use crate::{ pub struct DynRecordImmutableArrays { _null: Arc, _ts: Arc, - columns: Vec, + columns: Vec, record_batch: arrow::record_batch::RecordBatch, } @@ -32,7 +32,7 @@ impl ArrowArrays for DynRecordImmutableArrays { type Builder = DynRecordBuilder; - fn builder(schema: &Arc, capacity: usize) -> Self::Builder { + fn builder(schema: Arc, capacity: usize) -> Self::Builder { let mut builders: Vec> = vec![]; let mut datatypes = vec![]; for field in schema.fields().iter().skip(2) { @@ -153,7 +153,7 @@ impl ArrowArrays for DynRecordImmutableArrays { .to_owned(), ), }; - columns.push(Column { + columns.push(Value { datatype, name, value, @@ -171,7 +171,7 @@ impl ArrowArrays for DynRecordImmutableArrays { } } impl DynRecordImmutableArrays { - fn primitive_value(col: &Column, offset: usize) -> T::Native + fn primitive_value(col: &Value, offset: usize) -> T::Native where T: ArrowPrimitiveType, { @@ -188,13 +188,13 @@ pub struct DynRecordBuilder { datatypes: Vec, _null: BooleanBufferBuilder, _ts: UInt32Builder, - schema: Arc, + schema: Arc, } impl Builder for DynRecordBuilder { fn push( &mut self, - key: Timestamped<<::Key as Key>::Ref<'_>>, + key: Timestamped<<<::Schema as Schema>::Key as Key>::Ref<'_>>, row: Option, ) { self._null.append(row.is_none()); @@ -466,7 +466,7 @@ impl Builder for DynRecordBuilder { Self::as_builder_mut::>(builder.as_mut()) .finish(), ); - columns.push(Column { + columns.push(Value { datatype: Datatype::UInt8, name: field.name().to_owned(), value: value.clone(), @@ -479,7 +479,7 @@ impl Builder for DynRecordBuilder { Self::as_builder_mut::>(builder.as_mut()) .finish(), ); - columns.push(Column { + columns.push(Value { datatype: Datatype::UInt16, name: field.name().to_owned(), value: value.clone(), @@ -492,7 +492,7 @@ impl Builder for DynRecordBuilder { Self::as_builder_mut::>(builder.as_mut()) .finish(), ); - columns.push(Column { + columns.push(Value { datatype: Datatype::UInt32, name: field.name().to_owned(), value: value.clone(), @@ -505,7 +505,7 @@ impl Builder for DynRecordBuilder { Self::as_builder_mut::>(builder.as_mut()) .finish(), ); - columns.push(Column { + columns.push(Value { datatype: Datatype::UInt64, name: field.name().to_owned(), value: value.clone(), @@ -518,7 +518,7 @@ impl Builder for DynRecordBuilder { Self::as_builder_mut::>(builder.as_mut()) .finish(), ); - columns.push(Column { + columns.push(Value { datatype: Datatype::Int8, name: field.name().to_owned(), value: value.clone(), @@ -531,7 +531,7 @@ impl Builder for DynRecordBuilder { Self::as_builder_mut::>(builder.as_mut()) .finish(), ); - columns.push(Column { + columns.push(Value { datatype: Datatype::Int16, name: field.name().to_owned(), value: value.clone(), @@ -544,7 +544,7 @@ impl Builder for DynRecordBuilder { Self::as_builder_mut::>(builder.as_mut()) .finish(), ); - columns.push(Column { + columns.push(Value { datatype: Datatype::Int32, name: field.name().to_owned(), value: value.clone(), @@ -557,7 +557,7 @@ impl Builder for DynRecordBuilder { Self::as_builder_mut::>(builder.as_mut()) .finish(), ); - columns.push(Column { + columns.push(Value { datatype: Datatype::Int64, name: field.name().to_owned(), value: value.clone(), @@ -568,7 +568,7 @@ impl Builder for DynRecordBuilder { Datatype::String => { let value = Arc::new(Self::as_builder_mut::(builder.as_mut()).finish()); - columns.push(Column { + columns.push(Value { datatype: Datatype::String, name: field.name().to_owned(), value: value.clone(), @@ -579,7 +579,7 @@ impl Builder for DynRecordBuilder { Datatype::Boolean => { let value = Arc::new(Self::as_builder_mut::(builder.as_mut()).finish()); - columns.push(Column { + columns.push(Value { datatype: Datatype::Boolean, name: field.name().to_owned(), value: value.clone(), @@ -592,7 +592,7 @@ impl Builder for DynRecordBuilder { Self::as_builder_mut::>(builder.as_mut()) .finish(), ); - columns.push(Column { + columns.push(Value { datatype: Datatype::Bytes, name: field.name().to_owned(), value: value.clone(), @@ -624,7 +624,7 @@ impl Builder for DynRecordBuilder { impl DynRecordBuilder { fn push_primary_key( &mut self, - key: Timestamped<<::Key as Key>::Ref<'_>>, + key: Timestamped<<<::Schema as Schema>::Key as Key>::Ref<'_>>, primary_key_index: usize, ) { let builder = self.builders.get_mut(primary_key_index).unwrap(); diff --git a/src/record/runtime/column.rs b/src/record/runtime/column.rs deleted file mode 100644 index 0b356a62..00000000 --- a/src/record/runtime/column.rs +++ /dev/null @@ -1,400 +0,0 @@ -use std::{any::Any, fmt::Debug, hash::Hash, sync::Arc}; - -use arrow::{ - array::{ - BooleanArray, GenericBinaryArray, Int16Array, Int32Array, Int64Array, Int8Array, - StringArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array, - }, - datatypes::{DataType, Field}, -}; -use fusio::{SeqRead, Write}; - -use super::Datatype; -use crate::{ - record::{Key, KeyRef}, - serdes::{option::DecodeError, Decode, Encode}, -}; - -#[derive(Debug, Clone)] -pub struct ColumnDesc { - pub datatype: Datatype, - pub is_nullable: bool, - pub name: String, -} - -impl ColumnDesc { - pub fn new(name: String, datatype: Datatype, is_nullable: bool) -> Self { - Self { - name, - datatype, - is_nullable, - } - } -} - -#[derive(Clone)] -pub struct Column { - pub datatype: Datatype, - pub value: Arc, - pub is_nullable: bool, - pub name: String, -} - -impl Column { - pub fn new( - datatype: Datatype, - name: String, - value: Arc, - is_nullable: bool, - ) -> Self { - Self { - datatype, - name, - value, - is_nullable, - } - } - - pub(crate) fn with_none_value(datatype: Datatype, name: String, is_nullable: bool) -> Self { - match datatype { - Datatype::UInt8 => Self::new(datatype, name, Arc::>::new(None), is_nullable), - Datatype::UInt16 => { - Self::new(datatype, name, Arc::>::new(None), is_nullable) - } - Datatype::UInt32 => { - Self::new(datatype, name, Arc::>::new(None), is_nullable) - } - Datatype::UInt64 => { - Self::new(datatype, name, Arc::>::new(None), is_nullable) - } - Datatype::Int8 => Self::new(datatype, name, Arc::>::new(None), is_nullable), - Datatype::Int16 => { - Self::new(datatype, name, Arc::>::new(None), is_nullable) - } - Datatype::Int32 => { - Self::new(datatype, name, Arc::>::new(None), is_nullable) - } - Datatype::Int64 => { - Self::new(datatype, name, Arc::>::new(None), is_nullable) - } - Datatype::String => Self::new( - datatype, - name, - Arc::>::new(None), - is_nullable, - ), - Datatype::Boolean => { - Self::new(datatype, name, Arc::>::new(None), is_nullable) - } - Datatype::Bytes => Self::new( - datatype, - name, - Arc::>>::new(None), - is_nullable, - ), - } - } -} - -impl Eq for Column {} - -impl PartialOrd for Column { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - -macro_rules! implement_col { - ([], $({$Type:ty, $Datatype:ident}), *) => { - impl Ord for Column { - fn cmp(&self, other: &Self) -> std::cmp::Ordering { - match self.datatype { - $( - Datatype::$Datatype => self - .value - .downcast_ref::<$Type>() - .cmp(&other.value.downcast_ref::<$Type>()), - )* - } - } - } - - impl PartialEq for Column { - fn eq(&self, other: &Self) -> bool { - self.datatype == other.datatype - && self.is_nullable == other.is_nullable - && match self.datatype { - $( - Datatype::$Datatype => self - .value - .downcast_ref::<$Type>() - .eq(&other.value.downcast_ref::<$Type>()), - )* - } - } - } - - impl Hash for Column { - fn hash(&self, state: &mut H) { - match self.datatype { - $( - Datatype::$Datatype => self.value.downcast_ref::<$Type>().hash(state), - )* - } - } - } - - impl Debug for Column { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let mut debug_struct = f.debug_struct("Column"); - match self.datatype { - $( - Datatype::$Datatype => { - debug_struct.field("datatype", &stringify!($Type)); - if let Some(value) = self.value.as_ref().downcast_ref::<$Type>() { - debug_struct.field("value", value); - } else { - debug_struct.field( - "value", - self.value.as_ref().downcast_ref::>().unwrap(), - ); - } - } - )* - } - debug_struct.field("nullable", &self.is_nullable).finish() - } - } - - }; -} - -macro_rules! implement_key_col { - ($({$Type:ident, $Datatype:ident, $Array:ident}), *) => { - impl Key for Column { - type Ref<'a> = Column; - - fn as_key_ref(&self) -> Self::Ref<'_> { - self.clone() - } - - fn to_arrow_datum(&self) -> Arc { - match self.datatype { - $( - Datatype::$Datatype => Arc::new($Array::new_scalar( - *self - .value - .as_ref() - .downcast_ref::<$Type>() - .expect(stringify!("unexpected datatype, expected " $Type)) - )), - )* - Datatype::String => Arc::new(StringArray::new_scalar( - self - .value - .as_ref() - .downcast_ref::() - .expect("unexpected datatype, expected String"), - )), - Datatype::Boolean => Arc::new(BooleanArray::new_scalar( - *self - .value - .as_ref() - .downcast_ref::() - .expect("unexpected datatype, expected bool"), - )), - Datatype::Bytes => Arc::new(GenericBinaryArray::::new_scalar( - self - .value - .as_ref() - .downcast_ref::>() - .expect("unexpected datatype, expected bytes"), - )), - } - } - } - } -} - -impl<'r> KeyRef<'r> for Column { - type Key = Column; - - fn to_key(self) -> Self::Key { - self - } -} - -macro_rules! implement_decode_col { - ([], $({$Type:ty, $Datatype:ident}), *) => { - impl Decode for Column { - type Error = fusio::Error; - - async fn decode(reader: &mut R) -> Result - where - R: SeqRead, - { - let tag = u8::decode(reader).await?; - let datatype = Self::tag_to_datatype(tag); - let is_nullable = bool::decode(reader).await?; - let is_some = !bool::decode(reader).await?; - let value = - match datatype { - $( - Datatype::$Datatype => match is_some { - true => Arc::new(Option::<$Type>::decode(reader).await.map_err( - |err| match err { - DecodeError::Io(error) => fusio::Error::Io(error), - DecodeError::Fusio(error) => error, - DecodeError::Inner(error) => fusio::Error::Other(Box::new(error)), - }, - )?) as Arc, - false => Arc::new(<$Type>::decode(reader).await?) as Arc, - }, - )* - }; - let name = String::decode(reader).await?; - Ok(Column { - datatype, - is_nullable, - name, - value, - }) - } - } - } -} - -macro_rules! implement_encode_col { - ([], $({$Type:ty, $Datatype:ident}), *) => { - impl Encode for Column { - type Error = fusio::Error; - - async fn encode(&self, writer: &mut W) -> Result<(), Self::Error> - where - W: Write, - { - Self::tag(self.datatype).encode(writer).await?; - self.is_nullable.encode(writer).await?; - match self.datatype { - $( - Datatype::$Datatype => { - if let Some(value) = self.value.as_ref().downcast_ref::<$Type>() { - true.encode(writer).await?; - value.encode(writer).await? - } else { - false.encode(writer).await?; - self.value - .as_ref() - .downcast_ref::>() - .unwrap() - .encode(writer) - .await - .map_err(|err| fusio::Error::Other(Box::new(err)))?; - } - } - )* - }; - self.name.encode(writer).await?; - Ok(()) - } - - fn size(&self) -> usize { - 3 + self.name.size() + match self.datatype { - $( - Datatype::$Datatype => { - if let Some(value) = self.value.as_ref().downcast_ref::<$Type>() { - value.size() - } else { - self.value - .as_ref() - .downcast_ref::>() - .unwrap() - .size() - } - } - )* - } - } - } - } -} - -impl Column { - fn tag(datatype: Datatype) -> u8 { - match datatype { - Datatype::UInt8 => 0, - Datatype::UInt16 => 1, - Datatype::UInt32 => 2, - Datatype::UInt64 => 3, - Datatype::Int8 => 4, - Datatype::Int16 => 5, - Datatype::Int32 => 6, - Datatype::Int64 => 7, - Datatype::String => 8, - Datatype::Boolean => 9, - Datatype::Bytes => 10, - } - } - - fn tag_to_datatype(tag: u8) -> Datatype { - match tag { - 0 => Datatype::UInt8, - 1 => Datatype::UInt16, - 2 => Datatype::UInt32, - 3 => Datatype::UInt64, - 4 => Datatype::Int8, - 5 => Datatype::Int16, - 6 => Datatype::Int32, - 7 => Datatype::Int64, - 8 => Datatype::String, - 9 => Datatype::Boolean, - 10 => Datatype::Bytes, - _ => panic!("invalid datatype tag"), - } - } -} - -impl From<&Column> for Field { - fn from(col: &Column) -> Self { - match col.datatype { - Datatype::UInt8 => Field::new(&col.name, DataType::UInt8, col.is_nullable), - Datatype::UInt16 => Field::new(&col.name, DataType::UInt16, col.is_nullable), - Datatype::UInt32 => Field::new(&col.name, DataType::UInt32, col.is_nullable), - Datatype::UInt64 => Field::new(&col.name, DataType::UInt64, col.is_nullable), - Datatype::Int8 => Field::new(&col.name, DataType::Int8, col.is_nullable), - Datatype::Int16 => Field::new(&col.name, DataType::Int16, col.is_nullable), - Datatype::Int32 => Field::new(&col.name, DataType::Int32, col.is_nullable), - Datatype::Int64 => Field::new(&col.name, DataType::Int64, col.is_nullable), - Datatype::String => Field::new(&col.name, DataType::Utf8, col.is_nullable), - Datatype::Boolean => Field::new(&col.name, DataType::Boolean, col.is_nullable), - Datatype::Bytes => Field::new(&col.name, DataType::Binary, col.is_nullable), - } - } -} - -macro_rules! for_datatype { - ($macro:tt $(, $x:tt)*) => { - $macro! { - [$($x),*], - { u8, UInt8 }, - { u16, UInt16 }, - { u32, UInt32 }, - { u64, UInt64 }, - { i8, Int8 }, - { i16, Int16 }, - { i32, Int32 }, - { i64, Int64 }, - { String, String }, - { bool, Boolean }, - { Vec, Bytes } - } - }; -} - -implement_key_col!( - { u8, UInt8, UInt8Array }, { u16, UInt16, UInt16Array }, { u32, UInt32, UInt32Array }, { u64, UInt64, UInt64Array }, - { i8, Int8, Int8Array }, { i16, Int16, Int16Array }, { i32, Int32, Int32Array }, { i64, Int64, Int64Array } -); -for_datatype! { implement_col } -for_datatype! { implement_decode_col } -for_datatype! { implement_encode_col } diff --git a/src/record/runtime/mod.rs b/src/record/runtime/mod.rs index 90e8b304..8bba1c85 100644 --- a/src/record/runtime/mod.rs +++ b/src/record/runtime/mod.rs @@ -1,12 +1,12 @@ -mod array; -mod column; +pub(crate) mod array; mod record; mod record_ref; +mod value; use arrow::datatypes::DataType; -pub use column::*; pub use record::*; pub use record_ref::*; +pub use value::*; #[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, PartialOrd, Ord)] pub enum Datatype { diff --git a/src/record/runtime/record.rs b/src/record/runtime/record.rs index 3c11eb6c..4902e4a7 100644 --- a/src/record/runtime/record.rs +++ b/src/record/runtime/record.rs @@ -1,58 +1,31 @@ -use std::{any::Any, collections::HashMap, sync::Arc}; +use std::{any::Any, sync::Arc}; -use arrow::datatypes::{DataType, Field, Schema}; use fusio::SeqRead; -use parquet::{format::SortingColumn, schema::types::ColumnPath}; -use super::{array::DynRecordImmutableArrays, Column, ColumnDesc, Datatype, DynRecordRef}; +use super::{Datatype, DynRecordRef, Value, ValueDesc}; use crate::{ - record::{Record, RecordDecodeError}, + record::{DynSchema, Record, RecordDecodeError}, serdes::{Decode, Encode}, }; #[derive(Debug)] pub struct DynRecord { - columns: Vec, + values: Vec, primary_index: usize, } #[allow(unused)] impl DynRecord { - pub fn new(columns: Vec, primary_index: usize) -> Self { + pub fn new(values: Vec, primary_index: usize) -> Self { Self { - columns, + values, primary_index, } } - - pub(crate) fn primary_key_index(&self) -> usize { - self.primary_index + 2 - } - - pub(crate) fn arrow_schema(&self) -> Arc { - let mut fields = vec![ - Field::new("_null", DataType::Boolean, false), - Field::new("_ts", DataType::UInt32, false), - ]; - - for (idx, col) in self.columns.iter().enumerate() { - if idx == self.primary_index && col.is_nullable { - panic!("Primary key must not be nullable") - } - let mut field = Field::from(col); - fields.push(field); - } - let mut metadata = HashMap::new(); - metadata.insert( - "primary_key_index".to_string(), - self.primary_index.to_string(), - ); - Arc::new(Schema::new_with_metadata(fields, metadata)) - } } impl DynRecord { - pub(crate) fn empty_record(column_descs: Vec, primary_index: usize) -> DynRecord { + pub(crate) fn empty_record(column_descs: Vec, primary_index: usize) -> DynRecord { let mut columns = vec![]; for desc in column_descs.iter() { let value: Arc = match desc.datatype { @@ -101,7 +74,7 @@ impl DynRecord { false => Arc::new(Vec::::default()), }, }; - columns.push(Column::new( + columns.push(Value::new( desc.datatype, desc.name.to_owned(), value, @@ -122,10 +95,10 @@ impl Decode for DynRecord { { let len = u32::decode(reader).await? as usize; let primary_index = u32::decode(reader).await? as usize; - let mut columns = vec![]; + let mut values = vec![]; // keep invariant for record: nullable --> Some(v); non-nullable --> v for i in 0..len { - let mut col = Column::decode(reader).await?; + let mut col = Value::decode(reader).await?; if i != primary_index && !col.is_nullable { match col.datatype { Datatype::UInt8 => { @@ -178,34 +151,24 @@ impl Decode for DynRecord { } } } - columns.push(col); + values.push(col); } Ok(DynRecord { - columns, + values, primary_index, }) } } impl Record for DynRecord { - type Columns = DynRecordImmutableArrays; - - type Key = Column; + type Schema = DynSchema; type Ref<'r> = DynRecordRef<'r>; - fn primary_key_index() -> usize { - unreachable!("This method is not used.") - } - - fn primary_key_path() -> (ColumnPath, Vec) { - unreachable!("This method is not used.") - } - fn as_record_ref(&self) -> Self::Ref<'_> { let mut columns = vec![]; - for (idx, col) in self.columns.iter().enumerate() { + for (idx, col) in self.values.iter().enumerate() { let datatype = col.datatype; let is_nullable = col.is_nullable; let mut value = col.value.clone(); @@ -255,7 +218,7 @@ impl Record for DynRecord { }; } - columns.push(Column::new( + columns.push(Value::new( datatype, col.name.to_owned(), value, @@ -265,12 +228,8 @@ impl Record for DynRecord { DynRecordRef::new(columns, self.primary_index) } - fn arrow_schema() -> &'static std::sync::Arc { - unreachable!("This method is not used.") - } - fn size(&self) -> usize { - self.columns.iter().fold(0, |acc, col| acc + col.size()) + self.values.iter().fold(0, |acc, col| acc + col.size()) } } @@ -279,19 +238,19 @@ pub(crate) mod test { use std::sync::Arc; use super::DynRecord; - use crate::record::{Column, ColumnDesc, Datatype}; + use crate::record::{Datatype, Value, ValueDesc}; #[allow(unused)] - pub(crate) fn test_dyn_item_schema() -> (Vec, usize) { + pub(crate) fn test_dyn_item_schema() -> (Vec, usize) { let descs = vec![ - ColumnDesc::new("id".to_string(), Datatype::Int64, false), - ColumnDesc::new("age".to_string(), Datatype::Int8, true), - ColumnDesc::new("height".to_string(), Datatype::Int16, true), - ColumnDesc::new("weight".to_string(), Datatype::Int32, false), - ColumnDesc::new("name".to_string(), Datatype::String, false), - ColumnDesc::new("email".to_string(), Datatype::String, true), - ColumnDesc::new("enabled".to_string(), Datatype::Boolean, false), - ColumnDesc::new("bytes".to_string(), Datatype::Bytes, true), + ValueDesc::new("id".to_string(), Datatype::Int64, false), + ValueDesc::new("age".to_string(), Datatype::Int8, true), + ValueDesc::new("height".to_string(), Datatype::Int16, true), + ValueDesc::new("weight".to_string(), Datatype::Int32, false), + ValueDesc::new("name".to_string(), Datatype::String, false), + ValueDesc::new("email".to_string(), Datatype::String, true), + ValueDesc::new("enabled".to_string(), Datatype::Boolean, false), + ValueDesc::new("bytes".to_string(), Datatype::Bytes, true), ]; (descs, 0) } @@ -301,44 +260,44 @@ pub(crate) mod test { let mut items = vec![]; for i in 0..50 { let mut columns = vec![ - Column::new(Datatype::Int64, "id".to_string(), Arc::new(i as i64), false), - Column::new( + Value::new(Datatype::Int64, "id".to_string(), Arc::new(i as i64), false), + Value::new( Datatype::Int8, "age".to_string(), Arc::new(Some(i as i8)), true, ), - Column::new( + Value::new( Datatype::Int16, "height".to_string(), Arc::new(Some(i as i16 * 20)), true, ), - Column::new( + Value::new( Datatype::Int32, "weight".to_string(), Arc::new(i * 200_i32), false, ), - Column::new( + Value::new( Datatype::String, "name".to_string(), Arc::new(i.to_string()), false, ), - Column::new( + Value::new( Datatype::String, "email".to_string(), Arc::new(Some(format!("{}@tonbo.io", i))), true, ), - Column::new( + Value::new( Datatype::Boolean, "enabled".to_string(), Arc::new(i % 2 == 0), false, ), - Column::new( + Value::new( Datatype::Bytes, "bytes".to_string(), Arc::new(Some(i.to_le_bytes().to_vec())), diff --git a/src/record/runtime/record_ref.rs b/src/record/runtime/record_ref.rs index a9525e8d..aa5f8bbf 100644 --- a/src/record/runtime/record_ref.rs +++ b/src/record/runtime/record_ref.rs @@ -3,28 +3,28 @@ use std::{any::Any, marker::PhantomData, mem, sync::Arc}; use arrow::{ array::{Array, ArrayRef, ArrowPrimitiveType, AsArray}, datatypes::{ - Int16Type, Int32Type, Int64Type, Int8Type, Schema, UInt16Type, UInt32Type, UInt64Type, - UInt8Type, + Int16Type, Int32Type, Int64Type, Int8Type, Schema as ArrowSchema, UInt16Type, UInt32Type, + UInt64Type, UInt8Type, }, }; use fusio::Write; -use super::{Column, Datatype, DynRecord}; +use super::{Datatype, DynRecord, Value}; use crate::{ - record::{internal::InternalRecordRef, Key, Record, RecordEncodeError, RecordRef}, + record::{internal::InternalRecordRef, Key, Record, RecordEncodeError, RecordRef, Schema}, serdes::Encode, }; #[derive(Clone)] pub struct DynRecordRef<'r> { - pub columns: Vec, + pub columns: Vec, // XXX: log encode should keep the same behavior pub primary_index: usize, _marker: PhantomData<&'r ()>, } impl<'r> DynRecordRef<'r> { - pub(crate) fn new(columns: Vec, primary_index: usize) -> Self { + pub(crate) fn new(columns: Vec, primary_index: usize) -> Self { Self { columns, primary_index, @@ -60,7 +60,7 @@ impl<'r> Encode for DynRecordRef<'r> { impl<'r> RecordRef<'r> for DynRecordRef<'r> { type Record = DynRecord; - fn key(self) -> <::Key as Key>::Ref<'r> { + fn key(self) -> <<::Schema as Schema>::Key as Key>::Ref<'r> { self.columns .get(self.primary_index) .cloned() @@ -71,7 +71,7 @@ impl<'r> RecordRef<'r> for DynRecordRef<'r> { record_batch: &'r arrow::array::RecordBatch, offset: usize, projection_mask: &'r parquet::arrow::ProjectionMask, - full_schema: &'r Arc, + full_schema: &'r Arc, ) -> InternalRecordRef<'r, Self> { let null = record_batch.column(0).as_boolean().value(offset); let metadata = full_schema.metadata(); @@ -98,7 +98,7 @@ impl<'r> RecordRef<'r> for DynRecordRef<'r> { .enumerate() .find(|(_idx, f)| field.contains(f)); if batch_field.is_none() { - columns.push(Column::with_none_value( + columns.push(Value::with_none_value( datatype, field.name().to_owned(), field.is_nullable(), @@ -197,7 +197,7 @@ impl<'r> RecordRef<'r> for DynRecordRef<'r> { } } }; - columns.push(Column::new( + columns.push(Value::new( datatype, field.name().to_owned(), value, diff --git a/src/record/test.rs b/src/record/test.rs index c9816956..effc185e 100644 --- a/src/record/test.rs +++ b/src/record/test.rs @@ -5,12 +5,12 @@ use arrow::{ Array, AsArray, BooleanArray, BooleanBufferBuilder, RecordBatch, StringArray, StringBuilder, UInt32Array, UInt32Builder, }, - datatypes::{DataType, Field, Schema, UInt32Type}, + datatypes::{DataType, Field, Schema as ArrowSchema, UInt32Type}, }; use once_cell::sync::Lazy; use parquet::{arrow::ProjectionMask, format::SortingColumn, schema::types::ColumnPath}; -use super::{internal::InternalRecordRef, Key, Record, RecordRef}; +use super::{internal::InternalRecordRef, Key, Record, RecordRef, Schema}; use crate::{ inmem::immutable::{ArrowArrays, Builder}, timestamp::Timestamped, @@ -18,25 +18,33 @@ use crate::{ const PRIMARY_FIELD_NAME: &str = "vstring"; -impl Record for String { +#[derive(Debug)] +pub struct StringSchema; + +impl Schema for StringSchema { + type Record = String; + type Columns = StringColumns; - type Key = Self; + type Key = String; - type Ref<'r> - = &'r str - where - Self: 'r; + fn arrow_schema(&self) -> &Arc { + static SCHEMA: Lazy> = Lazy::new(|| { + Arc::new(ArrowSchema::new(vec![ + Field::new("_null", DataType::Boolean, false), + Field::new("_ts", DataType::UInt32, false), + Field::new(PRIMARY_FIELD_NAME, DataType::Utf8, false), + ])) + }); - fn key(&self) -> &str { - self + &SCHEMA } - fn primary_key_index() -> usize { + fn primary_key_index(&self) -> usize { 2 } - fn primary_key_path() -> (ColumnPath, Vec) { + fn primary_key_path(&self) -> (ColumnPath, Vec) { ( ColumnPath::new(vec!["_ts".to_string(), PRIMARY_FIELD_NAME.to_string()]), vec![ @@ -45,21 +53,22 @@ impl Record for String { ], ) } +} - fn as_record_ref(&self) -> Self::Ref<'_> { +impl Record for String { + type Schema = StringSchema; + + type Ref<'r> + = &'r str + where + Self: 'r; + + fn key(&self) -> &str { self } - fn arrow_schema() -> &'static Arc { - static SCHEMA: Lazy> = Lazy::new(|| { - Arc::new(Schema::new(vec![ - Field::new("_null", DataType::Boolean, false), - Field::new("_ts", DataType::UInt32, false), - Field::new(PRIMARY_FIELD_NAME, DataType::Utf8, false), - ])) - }); - - &SCHEMA + fn as_record_ref(&self) -> Self::Ref<'_> { + self } fn size(&self) -> usize { @@ -70,7 +79,7 @@ impl Record for String { impl<'r> RecordRef<'r> for &'r str { type Record = String; - fn key(self) -> <::Key as Key>::Ref<'r> { + fn key(self) -> <<::Schema as Schema>::Key as Key>::Ref<'r> { self } @@ -80,7 +89,7 @@ impl<'r> RecordRef<'r> for &'r str { record_batch: &'r RecordBatch, offset: usize, _: &'r ProjectionMask, - _: &'r Arc, + _: &'r Arc, ) -> InternalRecordRef<'r, Self> { let ts = record_batch .column(1) @@ -108,7 +117,7 @@ impl ArrowArrays for StringColumns { type Builder = StringColumnsBuilder; - fn builder(_schema: &Arc, capacity: usize) -> Self::Builder { + fn builder(_schema: Arc, capacity: usize) -> Self::Builder { StringColumnsBuilder { _null: BooleanBufferBuilder::new(capacity), _ts: UInt32Builder::with_capacity(capacity), @@ -166,8 +175,9 @@ impl Builder for StringColumnsBuilder { let _ts = Arc::new(self._ts.finish()); let string = Arc::new(self.string.finish()); + let schema = StringSchema; let record_batch = RecordBatch::try_new( - ::Record::arrow_schema().clone(), + schema.arrow_schema().clone(), vec![ Arc::clone(&_null) as Arc, Arc::clone(&_ts) as Arc, diff --git a/src/stream/level.rs b/src/stream/level.rs index 81c42b2f..61b2e71e 100644 --- a/src/stream/level.rs +++ b/src/stream/level.rs @@ -19,7 +19,7 @@ use ulid::Ulid; use crate::{ fs::{FileId, FileType}, ondisk::{scan::SsTableScan, sstable::SsTable}, - record::Record, + record::{Record, Schema}, scope::Scope, stream::record_batch::RecordBatchEntry, timestamp::Timestamp, @@ -47,8 +47,8 @@ pub(crate) struct LevelStream<'level, R> where R: Record, { - lower: Bound<&'level R::Key>, - upper: Bound<&'level R::Key>, + lower: Bound<&'level ::Key>, + upper: Bound<&'level ::Key>, ts: Timestamp, level: usize, option: Arc>, @@ -72,7 +72,10 @@ where level: usize, start: usize, end: usize, - range: (Bound<&'level R::Key>, Bound<&'level R::Key>), + range: ( + Bound<&'level ::Key>, + Bound<&'level ::Key>, + ), ts: Timestamp, limit: Option, projection_mask: ProjectionMask, diff --git a/src/transaction.rs b/src/transaction.rs index 0293d76f..aa2df175 100644 --- a/src/transaction.rs +++ b/src/transaction.rs @@ -246,8 +246,8 @@ mod tests { executor::tokio::TokioExecutor, fs::manager::StoreManager, record::{ - runtime::{Column, Datatype, DynRecord}, - ColumnDesc, + runtime::{Datatype, DynRecord, Value}, + ValueDesc, }, tests::{build_db, build_schema, Test}, transaction::CommitError, @@ -789,9 +789,9 @@ mod tests { #[tokio::test] async fn test_dyn_record() { let descs = vec![ - ColumnDesc::new("age".to_string(), Datatype::Int8, false), - ColumnDesc::new("height".to_string(), Datatype::Int16, true), - ColumnDesc::new("weight".to_string(), Datatype::Int32, false), + ValueDesc::new("age".to_string(), Datatype::Int8, false), + ValueDesc::new("height".to_string(), Datatype::Int16, true), + ValueDesc::new("weight".to_string(), Datatype::Int32, false), ]; let temp_dir = TempDir::new().unwrap(); @@ -806,14 +806,14 @@ mod tests { db.insert(DynRecord::new( vec![ - Column::new(Datatype::Int8, "age".to_string(), Arc::new(1_i8), false), - Column::new( + Value::new(Datatype::Int8, "age".to_string(), Arc::new(1_i8), false), + Value::new( Datatype::Int16, "height".to_string(), Arc::new(Some(180_i16)), true, ), - Column::new( + Value::new( Datatype::Int32, "weight".to_string(), Arc::new(56_i32), @@ -827,7 +827,7 @@ mod tests { let txn = db.transaction().await; { - let key = Column::new(Datatype::Int8, "age".to_string(), Arc::new(1_i8), false); + let key = Value::new(Datatype::Int8, "age".to_string(), Arc::new(1_i8), false); let record_ref = txn.get(&key, Projection::All).await.unwrap(); assert!(record_ref.is_some()); diff --git a/src/version/mod.rs b/src/version/mod.rs index d1dda197..28d77cf5 100644 --- a/src/version/mod.rs +++ b/src/version/mod.rs @@ -19,7 +19,7 @@ use tracing::error; use crate::{ fs::{manager::StoreManager, FileId, FileType}, ondisk::sstable::SsTable, - record::Record, + record::{Record, Schema}, scope::Scope, serdes::Encode, stream::{level::LevelStream, record_batch::RecordBatchEntry, ScanStream}, @@ -44,7 +44,7 @@ where R: Record, { ts: Timestamp, - pub(crate) level_slice: [Vec>; MAX_LEVEL], + pub(crate) level_slice: [Vec::Key>>; MAX_LEVEL], clean_sender: Sender, option: Arc>, timestamp: Arc, @@ -119,7 +119,7 @@ where pub(crate) async fn query( &self, manager: &StoreManager, - key: &TimestampedRef, + key: &TimestampedRef<::Key>, projection_mask: ProjectionMask, parquet_lru: ParquetLru, ) -> Result>, VersionError> { @@ -181,7 +181,7 @@ where async fn table_query( &self, store: &Arc, - key: &TimestampedRef<::Key>, + key: &TimestampedRef<::Key>, level: usize, gen: FileId, projection_mask: ProjectionMask, @@ -201,7 +201,10 @@ where .map_err(VersionError::Parquet) } - pub(crate) fn scope_search(key: &R::Key, level: &[Scope]) -> usize { + pub(crate) fn scope_search( + key: &::Key, + level: &[Scope<::Key>], + ) -> usize { level .binary_search_by(|scope| scope.min.cmp(key)) .unwrap_or_else(|index| index.saturating_sub(1)) @@ -216,7 +219,10 @@ where &self, manager: &StoreManager, streams: &mut Vec>, - range: (Bound<&'streams R::Key>, Bound<&'streams R::Key>), + range: ( + Bound<&'streams ::Key>, + Bound<&'streams ::Key>, + ), ts: Timestamp, limit: Option, projection_mask: ProjectionMask, @@ -291,7 +297,7 @@ where Ok(()) } - pub(crate) fn to_edits(&self) -> Vec> { + pub(crate) fn to_edits(&self) -> Vec::Key>> { let mut edits = Vec::new(); for (level, scopes) in self.level_slice.iter().enumerate() { @@ -325,7 +331,7 @@ where R: Record, { #[error("version encode error: {0}")] - Encode(#[source] ::Error), + Encode(#[source] <::Key as Encode>::Error), #[error("version io error: {0}")] Io(#[from] std::io::Error), #[error("version parquet error: {0}")] diff --git a/src/version/set.rs b/src/version/set.rs index 14fbe048..e4050d47 100644 --- a/src/version/set.rs +++ b/src/version/set.rs @@ -16,7 +16,7 @@ use futures_util::StreamExt; use super::{TransactionTs, MAX_LEVEL}; use crate::{ fs::{manager::StoreManager, parse_file_id, FileId, FileType}, - record::Record, + record::{Record, Schema}, serdes::Encode, timestamp::Timestamp, version::{cleaner::CleanTag, edit::VersionEdit, Version, VersionError, VersionRef}, @@ -176,7 +176,7 @@ where pub(crate) async fn apply_edits( &self, - mut version_edits: Vec>, + mut version_edits: Vec::Key>>, delete_gens: Option>, is_recover: bool, ) -> Result<(), VersionError> { diff --git a/src/wal/mod.rs b/src/wal/mod.rs index 3fdb9d60..83db336c 100644 --- a/src/wal/mod.rs +++ b/src/wal/mod.rs @@ -13,7 +13,7 @@ use thiserror::Error; use crate::{ fs::FileId, - record::{Key, Record}, + record::{Key, Record, Schema}, serdes::{Decode, Encode}, timestamp::Timestamped, wal::{log::LogType, record_entry::RecordEntry}, @@ -48,7 +48,7 @@ where pub(crate) async fn write<'r>( &mut self, log_ty: LogType, - key: Timestamped<::Ref<'r>>, + key: Timestamped<<::Key as Key>::Ref<'r>>, value: Option>, ) -> Result<(), as Encode>::Error> { let mut writer = HashWriter::new(&mut self.file); @@ -73,7 +73,7 @@ where &mut self, ) -> impl Stream< Item = Result< - (LogType, Timestamped, Option), + (LogType, Timestamped<::Key>, Option), RecoverError<::Error>, >, > + '_ { diff --git a/src/wal/record_entry.rs b/src/wal/record_entry.rs index 015ed317..07929553 100644 --- a/src/wal/record_entry.rs +++ b/src/wal/record_entry.rs @@ -1,7 +1,7 @@ use fusio::{SeqRead, Write}; use crate::{ - record::{Key, Record}, + record::{Key, Record, Schema}, serdes::{Decode, Encode}, timestamp::Timestamped, }; @@ -10,8 +10,13 @@ pub(crate) enum RecordEntry<'r, R> where R: Record, { - Encode((Timestamped<::Ref<'r>>, Option>)), - Decode((Timestamped, Option)), + Encode( + ( + Timestamped<<::Key as Key>::Ref<'r>>, + Option>, + ), + ), + Decode((Timestamped<::Key>, Option)), } impl Encode for RecordEntry<'_, R> @@ -51,7 +56,9 @@ where where R: SeqRead, { - let key = Timestamped::::decode(reader).await.unwrap(); + let key = Timestamped::<::Key>::decode(reader) + .await + .unwrap(); let record = Option::::decode(reader).await.unwrap(); Ok(RecordEntry::Decode((key, record))) diff --git a/tests/wasm.rs b/tests/wasm.rs index 2d0007f6..b700334f 100644 --- a/tests/wasm.rs +++ b/tests/wasm.rs @@ -7,7 +7,7 @@ mod tests { use futures::StreamExt; use tonbo::{ executor::opfs::OpfsExecutor, - record::{Column, ColumnDesc, Datatype, DynRecord, Record}, + record::{Datatype, DynRecord, Record, Value, ValueDesc}, DbOption, Projection, DB, }; use wasm_bindgen_test::wasm_bindgen_test;