Skip to content

Commit

Permalink
refactor: add Schema trait
Browse files Browse the repository at this point in the history
  • Loading branch information
ethe committed Nov 25, 2024
1 parent b509436 commit fe735bc
Show file tree
Hide file tree
Showing 21 changed files with 405 additions and 693 deletions.
10 changes: 5 additions & 5 deletions src/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,7 @@ pub(crate) mod tests {
executor::tokio::TokioExecutor,
fs::{manager::StoreManager, FileId, FileType},
inmem::{immutable::Immutable, mutable::Mutable},
record::{Column, ColumnDesc, Datatype, DynRecord, Record, RecordInstance},
record::{Datatype, DynRecord, Record, RecordInstance, Value, ValueDesc},
scope::Scope,
tests::Test,
timestamp::Timestamp,
Expand Down Expand Up @@ -712,15 +712,15 @@ pub(crate) mod tests {
.unwrap();

let empty_record = DynRecord::empty_record(
vec![ColumnDesc::new("id".to_owned(), Datatype::Int32, false)],
vec![ValueDesc::new("id".to_owned(), Datatype::Int32, false)],
0,
);
let instance = RecordInstance::Runtime(empty_record);

let mut batch1_data = vec![];
let mut batch2_data = vec![];
for i in 0..40 {
let col = Column::new(Datatype::Int32, "id".to_owned(), Arc::new(i), false);
let col = Value::new(Datatype::Int32, "id".to_owned(), Arc::new(i), false);
if i % 4 == 0 {
continue;
}
Expand Down Expand Up @@ -758,11 +758,11 @@ pub(crate) mod tests {
.unwrap();
assert_eq!(
scope.min,
Column::new(Datatype::Int32, "id".to_owned(), Arc::new(2), false)
Value::new(Datatype::Int32, "id".to_owned(), Arc::new(2), false)
);
assert_eq!(
scope.max,
Column::new(Datatype::Int32, "id".to_owned(), Arc::new(39), false)
Value::new(Datatype::Int32, "id".to_owned(), Arc::new(39), false)
);
}

Expand Down
112 changes: 79 additions & 33 deletions src/inmem/immutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ use std::{
sync::Arc,
};

use arrow::{array::RecordBatch, datatypes::Schema};
use arrow::{array::RecordBatch, datatypes::Schema as ArrowSchema};
use crossbeam_skiplist::SkipMap;
use parquet::arrow::ProjectionMask;

use crate::{
record::{internal::InternalRecordRef, Key, Record, RecordInstance, RecordRef},
record::{internal::InternalRecordRef, Key, Record, RecordRef, Schema},
stream::record_batch::RecordBatchEntry,
timestamp::{Timestamp, Timestamped, TimestampedRef, EPOCH},
};
Expand All @@ -20,7 +20,7 @@ pub trait ArrowArrays: Sized + Sync {

type Builder: Builder<Self>;

fn builder(schema: &Arc<Schema>, capacity: usize) -> Self::Builder;
fn builder(schema: Arc<ArrowSchema>, capacity: usize) -> Self::Builder;

fn get(
&self,
Expand All @@ -37,7 +37,7 @@ where
{
fn push(
&mut self,
key: Timestamped<<<S::Record as Record>::Key as Key>::Ref<'_>>,
key: Timestamped<<<<S::Record as Record>::Schema as Schema>::Key as Key>::Ref<'_>>,
row: Option<<S::Record as Record>::Ref<'_>>,
);

Expand All @@ -51,26 +51,23 @@ where
A: ArrowArrays,
{
data: A,
index: BTreeMap<Timestamped<<A::Record as Record>::Key>, u32>,
index: BTreeMap<Timestamped<<<A::Record as Record>::Schema as Schema>::Key>, u32>,
}

impl<A>
From<(
SkipMap<Timestamped<<A::Record as Record>::Key>, Option<A::Record>>,
&RecordInstance,
)> for Immutable<A>
impl<A> Immutable<A>
where
A: ArrowArrays,
A::Record: Send,
{
fn from(
(mutable, instance): (
SkipMap<Timestamped<<A::Record as Record>::Key>, Option<A::Record>>,
&RecordInstance,
),
pub(crate) fn new(
mutable: SkipMap<
Timestamped<<<A::Record as Record>::Schema as Schema>::Key>,
Option<A::Record>,
>,
schema: Arc<ArrowSchema>,
) -> Self {
let mut index = BTreeMap::new();
let mut builder = A::builder(&instance.arrow_schema::<A::Record>(), mutable.len());
let mut builder = A::builder(schema, mutable.len());

for (offset, (key, value)) in mutable.into_iter().enumerate() {
builder.push(
Expand All @@ -93,8 +90,8 @@ where
pub(crate) fn scope(
&self,
) -> (
Option<&<A::Record as Record>::Key>,
Option<&<A::Record as Record>::Key>,
Option<&<<A::Record as Record>::Schema as Schema>::Key>,
Option<&<<A::Record as Record>::Schema as Schema>::Key>,
) {
(
self.index.first_key_value().map(|(key, _)| key.value()),
Expand All @@ -109,8 +106,8 @@ where
pub(crate) fn scan<'scan>(
&'scan self,
range: (
Bound<&'scan <A::Record as Record>::Key>,
Bound<&'scan <A::Record as Record>::Key>,
Bound<&'scan <<A::Record as Record>::Schema as Schema>::Key>,
Bound<&'scan <<A::Record as Record>::Schema as Schema>::Key>,
),
ts: Timestamp,
projection_mask: ProjectionMask,
Expand All @@ -128,14 +125,16 @@ where

let range = self
.index
.range::<TimestampedRef<<A::Record as Record>::Key>, _>((lower, upper));
.range::<TimestampedRef<<<A::Record as Record>::Schema as Schema>::Key>, _>((
lower, upper,
));

ImmutableScan::<A::Record>::new(range, self.data.as_record_batch(), projection_mask)
}

pub(crate) fn get(
&self,
key: &<A::Record as Record>::Key,
key: &<<A::Record as Record>::Schema as Schema>::Key,
ts: Timestamp,
projection_mask: ProjectionMask,
) -> Option<RecordBatchEntry<A::Record>> {
Expand All @@ -147,9 +146,13 @@ where
.next()
}

pub(crate) fn check_conflict(&self, key: &<A::Record as Record>::Key, ts: Timestamp) -> bool {
pub(crate) fn check_conflict(
&self,
key: &<<A::Record as Record>::Schema as Schema>::Key,
ts: Timestamp,
) -> bool {
self.index
.range::<TimestampedRef<<A::Record as Record>::Key>, _>((
.range::<TimestampedRef<<<A::Record as Record>::Schema as Schema>::Key>, _>((
Bound::Excluded(TimestampedRef::new(key, u32::MAX.into())),
Bound::Excluded(TimestampedRef::new(key, ts)),
))
Expand All @@ -162,7 +165,7 @@ pub struct ImmutableScan<'iter, R>
where
R: Record,
{
range: Range<'iter, Timestamped<R::Key>, u32>,
range: Range<'iter, Timestamped<<R::Schema as Schema>::Key>, u32>,
record_batch: &'iter RecordBatch,
projection_mask: ProjectionMask,
}
Expand All @@ -172,7 +175,7 @@ where
R: Record,
{
fn new(
range: Range<'iter, Timestamped<R::Key>, u32>,
range: Range<'iter, Timestamped<<R::Schema as Schema>::Key>, u32>,
record_batch: &'iter RecordBatch,
projection_mask: ProjectionMask,
) -> Self {
Expand Down Expand Up @@ -221,17 +224,61 @@ pub(crate) mod tests {
Array, BooleanArray, BooleanBufferBuilder, BooleanBuilder, PrimitiveBuilder,
RecordBatch, StringArray, StringBuilder, UInt32Array, UInt32Builder,
},
datatypes::{ArrowPrimitiveType, Schema, UInt32Type},
datatypes::{ArrowPrimitiveType, DataType, Field, Schema as ArrowSchema, UInt32Type},
};
use parquet::arrow::ProjectionMask;
use once_cell::sync::Lazy;
use parquet::{arrow::ProjectionMask, format::SortingColumn, schema::types::ColumnPath};

use super::{ArrowArrays, Builder};
use crate::{
record::Record,
record::{Record, Schema},
tests::{Test, TestRef},
timestamp::timestamped::Timestamped,
};

pub struct TestSchema;

impl Schema for TestSchema {
type Record = Test;

type Columns = TestImmutableArrays;

type Key = String;

fn arrow_schema(&self) -> &Arc<ArrowSchema> {
static SCHEMA: Lazy<Arc<ArrowSchema>> = Lazy::new(|| {
Arc::new(ArrowSchema::new(vec![
Field::new("_null", DataType::Boolean, false),
Field::new("_ts", DataType::UInt32, false),
Field::new("vstring", DataType::Utf8, false),
Field::new("vu32", DataType::UInt32, false),
Field::new("vbool", DataType::Boolean, true),
]))
});

&SCHEMA
}

fn primary_key_index(&self) -> usize {
2
}

fn primary_key_path(
&self,
) -> (
parquet::schema::types::ColumnPath,
Vec<parquet::format::SortingColumn>,
) {
(
ColumnPath::new(vec!["_ts".to_string(), "vstring".to_string()]),
vec![
SortingColumn::new(1, true, true),
SortingColumn::new(2, false, true),
],
)
}
}

#[derive(Debug)]
pub struct TestImmutableArrays {
_null: Arc<BooleanArray>,
Expand All @@ -248,7 +295,7 @@ pub(crate) mod tests {

type Builder = TestBuilder;

fn builder(_schema: &Arc<Schema>, capacity: usize) -> Self::Builder {
fn builder(_schema: Arc<ArrowSchema>, capacity: usize) -> Self::Builder {
TestBuilder {
vstring: StringBuilder::with_capacity(capacity, 0),
vu32: PrimitiveBuilder::<UInt32Type>::with_capacity(capacity),
Expand Down Expand Up @@ -336,10 +383,9 @@ pub(crate) mod tests {
let vbool = Arc::new(self.vobool.finish());
let _null = Arc::new(BooleanArray::new(self._null.finish(), None));
let _ts = Arc::new(self._ts.finish());
let schema = TestSchema;
let mut record_batch = RecordBatch::try_new(
Arc::clone(
<<TestImmutableArrays as ArrowArrays>::Record as Record>::arrow_schema(),
),
Arc::clone(schema.arrow_schema()),
vec![
Arc::clone(&_null) as Arc<dyn Array>,
Arc::clone(&_ts) as Arc<dyn Array>,
Expand Down
Loading

0 comments on commit fe735bc

Please sign in to comment.