diff --git a/Cargo.toml b/Cargo.toml index 95ea8af..5e4520f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,13 +11,14 @@ resolver = "2" version = "0.2.0" [package.metadata] -msrv = "1.81.0" +msrv = "1.82.0" [features] -bench = ["redb", "rocksdb", "sled"] +bench = ["redb", "rocksdb", "sled", "foyer"] bytes = ["dep:bytes"] datafusion = ["dep:async-trait", "dep:datafusion"] default = ["bytes", "tokio"] +foyer = ["tonbo_ext_reader/foyer"] load_tbl = [] redb = ["dep:redb"] rocksdb = ["dep:rocksdb"] @@ -58,7 +59,7 @@ crc32fast = "1" crossbeam-skiplist = "0.1" datafusion = { version = "42", optional = true } flume = { version = "0.11", features = ["async"] } -fusio = { git = "https://github.com/tonbo-io/fusio.git", rev = "db305599ae88c7a934ab1ab235d8b3efda95a242", package = "fusio", version = "0.3.1", features = [ +fusio = { package = "fusio", version = "0.3.3", features = [ "aws", "dyn", "fs", @@ -66,11 +67,11 @@ fusio = { git = "https://github.com/tonbo-io/fusio.git", rev = "db305599ae88c7a9 "tokio", "tokio-http", ] } -fusio-dispatch = { git = "https://github.com/tonbo-io/fusio.git", rev = "db305599ae88c7a934ab1ab235d8b3efda95a242", package = "fusio-dispatch", version = "0.2.1", features = [ +fusio-dispatch = { package = "fusio-dispatch", version = "0.2.2", features = [ "aws", "tokio", ] } -fusio-parquet = { git = "https://github.com/tonbo-io/fusio.git", rev = "db305599ae88c7a934ab1ab235d8b3efda95a242", package = "fusio-parquet", version = "0.2.1" } +fusio-parquet = { package = "fusio-parquet", version = "0.2.2" } futures-core = "0.3" futures-io = "0.3" futures-util = "0.3" diff --git a/bindings/python/Cargo.toml b/bindings/python/Cargo.toml index 0dd61b4..7365991 100644 --- a/bindings/python/Cargo.toml +++ b/bindings/python/Cargo.toml @@ -9,8 +9,8 @@ crate-type = ["cdylib"] [workspace] [dependencies] -fusio = { git = "https://github.com/tonbo-io/fusio.git", rev = "db305599ae88c7a934ab1ab235d8b3efda95a242", package = "fusio", version = "0.3.1", features = ["aws", "tokio"] } -fusio-dispatch = { git = "https://github.com/tonbo-io/fusio.git", rev = "db305599ae88c7a934ab1ab235d8b3efda95a242", package = "fusio-dispatch", version = "0.2.1", features = [ +fusio = { package = "fusio", version = "0.3.3", features = ["aws", "tokio"] } +fusio-dispatch = { package = "fusio-dispatch", version = "0.2.2", features = [ "aws", "tokio", ] } diff --git a/bindings/python/src/db.rs b/bindings/python/src/db.rs index 0af488f..e640301 100644 --- a/bindings/python/src/db.rs +++ b/bindings/python/src/db.rs @@ -11,7 +11,7 @@ use tonbo::{ record::{ColumnDesc, DynRecord}, DB, }; -use tonbo_ext_reader::foyer_reader::FoyerReader; +use tonbo_ext_reader::lru_reader::LruReader; use crate::{ column::Column, error::{CommitError, DbError}, @@ -27,7 +27,7 @@ type PyExecutor = TokioExecutor; pub struct TonboDB { desc: Arc>, primary_key_index: usize, - db: Arc>, + db: Arc>, } #[pymethods] diff --git a/bindings/python/src/transaction.rs b/bindings/python/src/transaction.rs index 2c41c02..15e70a9 100644 --- a/bindings/python/src/transaction.rs +++ b/bindings/python/src/transaction.rs @@ -7,7 +7,7 @@ use pyo3::{ }; use pyo3_asyncio::tokio::future_into_py; use tonbo::{record::DynRecord, transaction, Projection}; -use tonbo_ext_reader::foyer_reader::FoyerReader; +use tonbo_ext_reader::lru_reader::LruReader; use crate::{ column::Column, error::{repeated_commit_err, CommitError, DbError}, @@ -18,14 +18,14 @@ use crate::{ #[pyclass] pub struct Transaction { - txn: Option>, + txn: Option>, desc: Arc>, primary_key_index: usize, } impl Transaction { pub(crate) fn new<'txn>( - txn: transaction::Transaction<'txn, DynRecord, FoyerReader>, + txn: transaction::Transaction<'txn, DynRecord, LruReader>, desc: Arc>, ) -> Self { let primary_key_index = desc @@ -37,8 +37,8 @@ impl Transaction { Transaction { txn: Some(unsafe { transmute::< - transaction::Transaction<'txn, DynRecord, FoyerReader>, - transaction::Transaction<'static, DynRecord, FoyerReader>, + transaction::Transaction<'txn, DynRecord, LruReader>, + transaction::Transaction<'static, DynRecord, LruReader>, >(txn) }), desc, @@ -84,8 +84,8 @@ impl Transaction { let txn = self.txn.as_ref().unwrap(); let txn = unsafe { transmute::< - &transaction::Transaction<'_, DynRecord, FoyerReader>, - &'static transaction::Transaction<'_, DynRecord, FoyerReader>, + &transaction::Transaction<'_, DynRecord, LruReader>, + &'static transaction::Transaction<'_, DynRecord, LruReader>, >(txn) }; @@ -169,8 +169,8 @@ impl Transaction { let txn = self.txn.as_ref().unwrap(); let txn = unsafe { transmute::< - &transaction::Transaction<'_, DynRecord, FoyerReader>, - &'static transaction::Transaction<'_, DynRecord, FoyerReader>, + &transaction::Transaction<'_, DynRecord, LruReader>, + &'static transaction::Transaction<'_, DynRecord, LruReader>, >(txn) }; let col_desc = self.desc.get(self.primary_key_index).unwrap(); diff --git a/rust-toolchain.toml b/rust-toolchain.toml index db16da3..704483a 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,3 +1,3 @@ [toolchain] -channel = "1.81.0" +channel = "1.82.0" components = ["clippy", "rust-analyzer", "rustfmt"] diff --git a/src/compaction/mod.rs b/src/compaction/mod.rs index 4edbf7b..65bc440 100644 --- a/src/compaction/mod.rs +++ b/src/compaction/mod.rs @@ -525,7 +525,7 @@ pub(crate) mod tests { use fusio_parquet::writer::AsyncWriter; use parquet::arrow::AsyncArrowWriter; use tempfile::TempDir; - use tonbo_ext_reader::{foyer_reader::FoyerReader, CacheReader}; + use tonbo_ext_reader::{lru_reader::LruReader, CacheReader}; use crate::{ compaction::Compactor, @@ -684,7 +684,7 @@ pub(crate) mod tests { .await .unwrap(); - let scope = Compactor::::minor_compaction( + let scope = Compactor::::minor_compaction( &option, None, &vec![ @@ -748,7 +748,7 @@ pub(crate) mod tests { .await .unwrap(); - let scope = Compactor::::minor_compaction( + let scope = Compactor::::minor_compaction( &option, None, &vec![ @@ -813,7 +813,7 @@ pub(crate) mod tests { let max = 5.to_string(); let mut version_edits = Vec::new(); - Compactor::::major_compaction( + Compactor::::major_compaction( &version, &option, &min, @@ -859,7 +859,7 @@ pub(crate) mod tests { manager: &StoreManager, ) -> ( (FileId, FileId, FileId, FileId, FileId), - Version, + Version, ) { let level_0_fs = option .level_fs_path(0) @@ -1070,7 +1070,7 @@ pub(crate) mod tests { .unwrap(); let (sender, _) = bounded(1); - let (meta_cache, range_cache) = FoyerReader::build_caches( + let (meta_cache, range_cache) = LruReader::build_caches( path_to_local(&option.cache_path).unwrap(), option.cache_meta_capacity, option.cache_meta_shards, @@ -1082,7 +1082,7 @@ pub(crate) mod tests { ) .await .unwrap(); - let mut version = Version::::new( + let mut version = Version::::new( option.clone(), sender, Arc::new(AtomicU32::default()), @@ -1205,7 +1205,7 @@ pub(crate) mod tests { let option = Arc::new(option); let (sender, _) = bounded(1); - let (meta_cache, range_cache) = FoyerReader::build_caches( + let (meta_cache, range_cache) = LruReader::build_caches( path_to_local(&option.cache_path).unwrap(), option.cache_meta_capacity, option.cache_meta_shards, @@ -1217,7 +1217,7 @@ pub(crate) mod tests { ) .await .unwrap(); - let mut version = Version::::new( + let mut version = Version::::new( option.clone(), sender, Arc::new(AtomicU32::default()), @@ -1241,7 +1241,7 @@ pub(crate) mod tests { let min = 6.to_string(); let max = 9.to_string(); - Compactor::::major_compaction( + Compactor::::major_compaction( &version, &option, &min, @@ -1270,7 +1270,7 @@ pub(crate) mod tests { option.major_default_oldest_table_num = 1; option.trigger_type = TriggerType::Length(5); - let db: DB = + let db: DB = DB::new(option, TokioExecutor::new()).await.unwrap(); for i in 5..9 { diff --git a/src/inmem/mutable.rs b/src/inmem/mutable.rs index e5a1ea3..230baff 100644 --- a/src/inmem/mutable.rs +++ b/src/inmem/mutable.rs @@ -5,7 +5,7 @@ use crossbeam_skiplist::{ map::{Entry, Range}, SkipMap, }; -use fusio::{buffered::BufWriter, dynamic::DynFile, DynFs}; +use fusio::{buffered::BufWriter, DynFs, DynWrite}; use ulid::Ulid; use crate::{ @@ -37,7 +37,7 @@ where R: Record, { pub(crate) data: SkipMap, Option>, - wal: Option, R>>>, + wal: Option, R>>>, pub(crate) trigger: Arc + Send + Sync>>, } @@ -61,7 +61,7 @@ where ) .await?, option.wal_buffer_size, - )) as Box; + )) as Box; wal = Some(Mutex::new(WalFile::new(file, file_id))); }; diff --git a/src/lib.rs b/src/lib.rs index 6e7261c..818ad1d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -38,7 +38,7 @@ //! use tokio::fs; //! use tokio_util::bytes::Bytes; //! use tonbo::{executor::tokio::TokioExecutor, DbOption, Projection, Record, DB}; -//! use tonbo_ext_reader::foyer_reader::FoyerReader; +//! use tonbo_ext_reader::lru_reader::LruReader; //! //! // use macro to define schema of column family just like ORM //! // it provides type safety read & write API @@ -57,7 +57,7 @@ //! //! let options = DbOption::from(Path::from_filesystem_path("./db_path/users").unwrap()); //! // pluggable async runtime and I/O -//! let db: DB = +//! let db: DB = //! DB::new(options, TokioExecutor::default()).await.unwrap(); //! // insert with owned value //! db.insert(User { @@ -858,7 +858,7 @@ pub(crate) mod tests { use once_cell::sync::Lazy; use parquet::{arrow::ProjectionMask, format::SortingColumn, schema::types::ColumnPath}; use tempfile::TempDir; - use tonbo_ext_reader::{foyer_reader::FoyerReader, CacheReader}; + use tonbo_ext_reader::{lru_reader::LruReader, CacheReader}; use tracing::error; use crate::{ @@ -1097,7 +1097,7 @@ pub(crate) mod tests { option: DbOption, executor: E, ) -> RecordBatch { - let db: DB = DB::new(option.clone(), executor).await.unwrap(); + let db: DB = DB::new(option.clone(), executor).await.unwrap(); let base_fs = db.manager.base_fs(); db.write( @@ -1534,7 +1534,7 @@ pub(crate) mod tests { option.major_default_oldest_table_num = 1; option.trigger_type = TriggerType::Length(/* max_mutable_len */ 5); - let db: DB = + let db: DB = DB::new(option, TokioExecutor::new()).await.unwrap(); for (i, item) in test_items().into_iter().enumerate() { @@ -1571,7 +1571,7 @@ pub(crate) mod tests { option.major_default_oldest_table_num = 1; option.trigger_type = TriggerType::Length(/* max_mutable_len */ 50); - let db: DB = + let db: DB = DB::new(option, TokioExecutor::new()).await.unwrap(); for item in &test_items()[0..10] { @@ -1621,7 +1621,7 @@ pub(crate) mod tests { schema.flush_wal().await.unwrap(); drop(schema); - let db: DB = + let db: DB = DB::new(option.as_ref().to_owned(), TokioExecutor::new()) .await .unwrap(); @@ -1694,7 +1694,7 @@ pub(crate) mod tests { "id".to_owned(), primary_key_index, ); - let db: DB = + let db: DB = DB::with_schema(option, TokioExecutor::new(), desc, primary_key_index) .await .unwrap(); @@ -1734,7 +1734,7 @@ pub(crate) mod tests { option.major_threshold_with_sst_size = 3; option.major_default_oldest_table_num = 1; option.trigger_type = TriggerType::Length(5); - let db: DB = + let db: DB = DB::new(option, TokioExecutor::new()).await.unwrap(); for (idx, item) in test_items().into_iter().enumerate() { @@ -1777,7 +1777,7 @@ pub(crate) mod tests { option.major_default_oldest_table_num = 1; option.trigger_type = TriggerType::Length(5); - let db: DB = + let db: DB = DB::with_schema(option, TokioExecutor::new(), cols_desc, primary_key_index) .await .unwrap(); @@ -2007,7 +2007,7 @@ pub(crate) mod tests { option3.major_default_oldest_table_num = 1; option3.trigger_type = TriggerType::Length(5); - let db1: DB = DB::with_schema( + let db1: DB = DB::with_schema( option, TokioExecutor::new(), cols_desc.clone(), @@ -2015,7 +2015,7 @@ pub(crate) mod tests { ) .await .unwrap(); - let db2: DB = DB::with_schema( + let db2: DB = DB::with_schema( option2, TokioExecutor::new(), cols_desc.clone(), @@ -2023,7 +2023,7 @@ pub(crate) mod tests { ) .await .unwrap(); - let db3: DB = + let db3: DB = DB::with_schema(option3, TokioExecutor::new(), cols_desc, primary_key_index) .await .unwrap(); diff --git a/src/ondisk/sstable.rs b/src/ondisk/sstable.rs index abb3200..8d0f5cd 100644 --- a/src/ondisk/sstable.rs +++ b/src/ondisk/sstable.rs @@ -132,7 +132,7 @@ pub(crate) mod tests { basic::{Compression, ZstdLevel}, file::properties::WriterProperties, }; - use tonbo_ext_reader::{foyer_reader::FoyerReader, CacheReader}; + use tonbo_ext_reader::{lru_reader::LruReader, CacheReader}; use super::SsTable; use crate::{ @@ -234,7 +234,7 @@ pub(crate) mod tests { { let test_ref_1 = - open_sstable::(base_fs, table_path.clone(), table_gen, &option) + open_sstable::(base_fs, table_path.clone(), table_gen, &option) .await .get( key.borrow(), @@ -252,7 +252,7 @@ pub(crate) mod tests { } { let test_ref_2 = - open_sstable::(base_fs, table_path.clone(), table_gen, &option) + open_sstable::(base_fs, table_path.clone(), table_gen, &option) .await .get( key.borrow(), @@ -270,7 +270,7 @@ pub(crate) mod tests { } { let test_ref_3 = - open_sstable::(base_fs, table_path.clone(), table_gen, &option) + open_sstable::(base_fs, table_path.clone(), table_gen, &option) .await .get( key.borrow(), @@ -310,7 +310,7 @@ pub(crate) mod tests { { let mut test_ref_1 = - open_sstable::(base_fs, table_path.clone(), table_gen, &option) + open_sstable::(base_fs, table_path.clone(), table_gen, &option) .await .scan( (Bound::Unbounded, Bound::Unbounded), @@ -336,7 +336,7 @@ pub(crate) mod tests { } { let mut test_ref_2 = - open_sstable::(base_fs, table_path.clone(), table_gen, &option) + open_sstable::(base_fs, table_path.clone(), table_gen, &option) .await .scan( (Bound::Unbounded, Bound::Unbounded), @@ -362,7 +362,7 @@ pub(crate) mod tests { } { let mut test_ref_3 = - open_sstable::(base_fs, table_path.clone(), table_gen, &option) + open_sstable::(base_fs, table_path.clone(), table_gen, &option) .await .scan( (Bound::Unbounded, Bound::Unbounded), diff --git a/src/stream/mem_projection.rs b/src/stream/mem_projection.rs index 9206294..230cc8b 100644 --- a/src/stream/mem_projection.rs +++ b/src/stream/mem_projection.rs @@ -68,7 +68,7 @@ mod tests { use fusio::{disk::TokioFs, path::Path, DynFs}; use futures_util::StreamExt; use parquet::arrow::{arrow_to_parquet_schema, ProjectionMask}; - use tonbo_ext_reader::foyer_reader::FoyerReader; + use tonbo_ext_reader::lru_reader::LruReader; use crate::{ inmem::mutable::Mutable, record::Record, stream::mem_projection::MemProjectionStream, @@ -129,7 +129,7 @@ mod tests { vec![0, 1, 2, 4], ); - let mut stream = MemProjectionStream::::new( + let mut stream = MemProjectionStream::::new( mutable .scan((Bound::Unbounded, Bound::Unbounded), 6.into()) .into(), diff --git a/src/stream/merge.rs b/src/stream/merge.rs index 7d144a6..f086d80 100644 --- a/src/stream/merge.rs +++ b/src/stream/merge.rs @@ -164,7 +164,7 @@ mod tests { use fusio::{disk::TokioFs, path::Path, DynFs}; use futures_util::StreamExt; - use tonbo_ext_reader::foyer_reader::FoyerReader; + use tonbo_ext_reader::lru_reader::LruReader; use super::MergeStream; use crate::{ @@ -217,7 +217,7 @@ mod tests { let lower = "a".to_string(); let upper = "e".to_string(); let bound = (Bound::Included(&lower), Bound::Included(&upper)); - let mut merge = MergeStream::::from_vec( + let mut merge = MergeStream::::from_vec( vec![ m1.scan(bound, 6.into()).into(), m2.scan(bound, 6.into()).into(), @@ -296,7 +296,7 @@ mod tests { let lower = "1".to_string(); let upper = "4".to_string(); let bound = (Bound::Included(&lower), Bound::Included(&upper)); - let mut merge = MergeStream::::from_vec( + let mut merge = MergeStream::::from_vec( vec![m1.scan(bound, 0.into()).into()], 0.into(), ) @@ -326,7 +326,7 @@ mod tests { let lower = "1".to_string(); let upper = "4".to_string(); let bound = (Bound::Included(&lower), Bound::Included(&upper)); - let mut merge = MergeStream::::from_vec( + let mut merge = MergeStream::::from_vec( vec![m1.scan(bound, 1.into()).into()], 1.into(), ) @@ -380,7 +380,7 @@ mod tests { let lower = "1".to_string(); let upper = "3".to_string(); { - let mut merge = MergeStream::::from_vec( + let mut merge = MergeStream::::from_vec( vec![m1 .scan((Bound::Included(&lower), Bound::Included(&upper)), 0.into()) .into()], @@ -400,7 +400,7 @@ mod tests { assert!(merge.next().await.is_none()); } { - let mut merge = MergeStream::::from_vec( + let mut merge = MergeStream::::from_vec( vec![m1 .scan((Bound::Included(&lower), Bound::Included(&upper)), 0.into()) .into()], diff --git a/src/stream/package.rs b/src/stream/package.rs index e1a4f20..5c63f78 100644 --- a/src/stream/package.rs +++ b/src/stream/package.rs @@ -89,7 +89,7 @@ mod tests { use fusio::{disk::TokioFs, path::Path, DynFs}; use futures_util::StreamExt; use tempfile::TempDir; - use tonbo_ext_reader::foyer_reader::FoyerReader; + use tonbo_ext_reader::lru_reader::LruReader; use crate::{ inmem::{ @@ -182,7 +182,7 @@ mod tests { .await .unwrap(); - let merge = MergeStream::::from_vec( + let merge = MergeStream::::from_vec( vec![m1 .scan((Bound::Unbounded, Bound::Unbounded), 6.into()) .into()], diff --git a/src/transaction.rs b/src/transaction.rs index 8e85b34..3e9333d 100644 --- a/src/transaction.rs +++ b/src/transaction.rs @@ -260,7 +260,7 @@ mod tests { use fusio_dispatch::FsOptions; use futures_util::StreamExt; use tempfile::TempDir; - use tonbo_ext_reader::foyer_reader::FoyerReader; + use tonbo_ext_reader::lru_reader::LruReader; use crate::{ compaction::tests::build_version, @@ -280,7 +280,7 @@ mod tests { async fn transaction_read_write() { let temp_dir = TempDir::new().unwrap(); - let db = DB::::new( + let db = DB::::new( DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap()), TokioExecutor::new(), ) @@ -408,7 +408,7 @@ mod tests { let temp_dir = TempDir::new().unwrap(); let option = DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap()); - let db = DB::::new(option, TokioExecutor::new()) + let db = DB::::new(option, TokioExecutor::new()) .await .unwrap(); @@ -441,7 +441,7 @@ mod tests { let temp_dir = TempDir::new().unwrap(); let option = DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap()); - let db = DB::::new(option, TokioExecutor::new()) + let db = DB::::new(option, TokioExecutor::new()) .await .unwrap(); @@ -811,7 +811,7 @@ mod tests { "age".to_string(), 0, ); - let db = DB::<_, _, FoyerReader>::with_schema(option, TokioExecutor::default(), descs, 0) + let db = DB::<_, _, LruReader>::with_schema(option, TokioExecutor::default(), descs, 0) .await .unwrap(); diff --git a/src/version/set.rs b/src/version/set.rs index 93d16d0..05a9cf8 100644 --- a/src/version/set.rs +++ b/src/version/set.rs @@ -322,7 +322,7 @@ pub(crate) mod tests { use fusio_dispatch::FsOptions; use futures_util::StreamExt; use tempfile::TempDir; - use tonbo_ext_reader::{foyer_reader::FoyerReader, CacheReader}; + use tonbo_ext_reader::{lru_reader::LruReader, CacheReader}; use crate::{ fs::{manager::StoreManager, FileId, FileType}, @@ -396,7 +396,7 @@ pub(crate) mod tests { .await .unwrap(); - let version_set: VersionSet = + let version_set: VersionSet = VersionSet::new(sender.clone(), option.clone(), manager.clone()) .await .unwrap(); @@ -412,7 +412,7 @@ pub(crate) mod tests { drop(version_set); - let version_set: VersionSet = + let version_set: VersionSet = VersionSet::new(sender.clone(), option.clone(), manager) .await .unwrap(); @@ -434,7 +434,7 @@ pub(crate) mod tests { .await .unwrap(); - let version_set: VersionSet = + let version_set: VersionSet = VersionSet::new(sender.clone(), option.clone(), manager.clone()) .await .unwrap(); @@ -562,7 +562,7 @@ pub(crate) mod tests { .await .unwrap(); - let version_set: VersionSet = + let version_set: VersionSet = VersionSet::new(sender.clone(), option.clone(), manager) .await .unwrap(); diff --git a/tonbo_ext_reader/Cargo.toml b/tonbo_ext_reader/Cargo.toml index 1ebe334..5b9bb4b 100644 --- a/tonbo_ext_reader/Cargo.toml +++ b/tonbo_ext_reader/Cargo.toml @@ -4,6 +4,9 @@ name = "tonbo_ext_reader" version = "0.1.0" edition = "2021" +[features] +foyer = [] + [dependencies] anyhow = "1" arrow = "53" @@ -11,7 +14,7 @@ bytes = { version = "1.7", features = ["serde"] } foyer = { version = "0.12" } futures-core = "0.3" futures-util = "0.3" -fusio-parquet = { git = "https://github.com/tonbo-io/fusio.git", rev = "db305599ae88c7a934ab1ab235d8b3efda95a242", package = "fusio-parquet", version = "0.2.1" } +fusio-parquet = { package = "fusio-parquet", version = "0.2.2" } lru = "0.12" parking_lot = "0.12" parquet = { version = "53", features = ["async"] } @@ -19,7 +22,7 @@ thiserror = "1" ulid = { version = "1", features = ["serde"] } [dev-dependencies] -fusio = { git = "https://github.com/tonbo-io/fusio.git", rev = "db305599ae88c7a934ab1ab235d8b3efda95a242", package = "fusio", version = "0.3.1", features = [ +fusio = { package = "fusio", version = "0.3.3", features = [ "aws", "dyn", "fs", @@ -27,8 +30,8 @@ fusio = { git = "https://github.com/tonbo-io/fusio.git", rev = "db305599ae88c7a9 "tokio", "tokio-http", ] } -fusio-dispatch = { git = "https://github.com/tonbo-io/fusio.git", rev = "db305599ae88c7a934ab1ab235d8b3efda95a242", package = "fusio-dispatch", version = "0.2.1", features = [ +fusio-dispatch = { package = "fusio-dispatch", version = "0.2.2", features = [ "tokio", ] } tempfile = "3" -tokio = { version = "1", features = ["full"] } \ No newline at end of file +tokio = { version = "1", features = ["full"] } diff --git a/tonbo_ext_reader/src/foyer_reader.rs b/tonbo_ext_reader/src/foyer_reader.rs index 2411acc..70090ba 100644 --- a/tonbo_ext_reader/src/foyer_reader.rs +++ b/tonbo_ext_reader/src/foyer_reader.rs @@ -10,7 +10,7 @@ use futures_util::FutureExt; use parquet::{arrow::async_reader::AsyncFileReader, file::metadata::ParquetMetaData}; use ulid::Ulid; -use crate::{CacheError, CacheReader, MetaCache, RangeCache}; +use crate::{CacheError, CacheReader, TonboCache}; #[derive(Debug, Clone)] pub struct FoyerMetaCache(Cache>); @@ -24,9 +24,9 @@ pub struct FoyerReader { meta_cache: FoyerMetaCache, } -impl MetaCache for FoyerMetaCache { - fn get(&self, gen: &Ulid) -> Option> { - self.0.get(gen).map(|entry| entry.value().clone()) +impl TonboCache> for FoyerMetaCache { + async fn get(&self, gen: &Ulid) -> Result>, CacheError> { + Ok(self.0.get(gen).map(|entry| entry.value().clone())) } fn insert(&self, gen: Ulid, data: Arc) -> Arc { @@ -34,7 +34,7 @@ impl MetaCache for FoyerMetaCache { } } -impl RangeCache for FoyerRangeCache { +impl TonboCache<(Ulid, Range), Bytes> for FoyerRangeCache { async fn get(&self, key: &(Ulid, Range)) -> Result, CacheError> { Ok(self.0.get(key).await?.map(|entry| entry.value().clone())) } @@ -112,7 +112,12 @@ impl AsyncFileReader for FoyerReader { fn get_metadata(&mut self) -> BoxFuture<'_, parquet::errors::Result>> { async move { - if let Some(meta) = self.meta_cache.get(&self.gen) { + if let Some(meta) = self + .meta_cache + .get(&self.gen) + .await + .map_err(|e| parquet::errors::ParquetError::External(From::from(e)))? + { return Ok(meta); } diff --git a/tonbo_ext_reader/src/lib.rs b/tonbo_ext_reader/src/lib.rs index 1b738d3..81fdfbd 100644 --- a/tonbo_ext_reader/src/lib.rs +++ b/tonbo_ext_reader/src/lib.rs @@ -6,27 +6,22 @@ use parquet::{arrow::async_reader::AsyncFileReader, file::metadata::ParquetMetaD use thiserror::Error; use ulid::Ulid; +#[cfg(feature = "foyer")] pub mod foyer_reader; pub mod lru_reader; -pub trait MetaCache: Sync + Send + Clone + Debug { - fn get(&self, gen: &Ulid) -> Option>; - - fn insert(&self, gen: Ulid, data: Arc) -> Arc; -} - -pub trait RangeCache: Sync + Send + Clone + Debug { +pub trait TonboCache: Sync + Send + Clone + Debug { fn get( &self, - key: &(Ulid, Range), - ) -> impl std::future::Future, CacheError>> + Send; + key: &K, + ) -> impl std::future::Future, CacheError>> + Send; - fn insert(&self, key: (Ulid, Range), bytes: Bytes) -> Bytes; + fn insert(&self, key: K, value: V) -> V; } pub trait CacheReader: AsyncFileReader + Unpin { - type MetaCache: MetaCache; - type RangeCache: RangeCache; + type MetaCache: TonboCache>; + type RangeCache: TonboCache<(Ulid, Range), Bytes>; fn new( meta_cache: Self::MetaCache, @@ -37,14 +32,14 @@ pub trait CacheReader: AsyncFileReader + Unpin { #[allow(clippy::too_many_arguments)] fn build_caches( - cache_path: impl AsRef + Send, - cache_meta_capacity: usize, - cache_meta_shards: usize, - cache_meta_ratio: f64, - cache_range_memory: usize, - cache_range_disk: usize, - cache_range_capacity: usize, - cache_range_shards: usize, + path: impl AsRef + Send, + meta_capacity: usize, + meta_shards: usize, + meta_ratio: f64, + range_memory: usize, + range_disk: usize, + range_capacity: usize, + range_shards: usize, ) -> impl std::future::Future> + Send; } @@ -80,7 +75,7 @@ pub(crate) mod tests { use tempfile::TempDir; use ulid::Ulid; - use crate::{foyer_reader::FoyerReader, lru_reader::LruReader, CacheReader}; + use crate::{lru_reader::LruReader, CacheReader}; struct CountFile { inner: Box, @@ -213,7 +208,8 @@ pub(crate) mod tests { #[tokio::test] async fn test_cache_read() { - inner_test_cache_read::().await; + #[cfg(feature = "foyer")] + inner_test_cache_read::().await; inner_test_cache_read::().await; } } diff --git a/tonbo_ext_reader/src/lru_reader.rs b/tonbo_ext_reader/src/lru_reader.rs index e3f4a5d..4a2cbc7 100644 --- a/tonbo_ext_reader/src/lru_reader.rs +++ b/tonbo_ext_reader/src/lru_reader.rs @@ -15,7 +15,7 @@ use parking_lot::Mutex; use parquet::{arrow::async_reader::AsyncFileReader, file::metadata::ParquetMetaData}; use ulid::Ulid; -use crate::{CacheError, CacheReader, MetaCache, RangeCache}; +use crate::{CacheError, CacheReader, TonboCache}; pub(crate) trait SharedKey: Hash + PartialEq + Eq { fn shared(&self, hash_builder: &S, shared: usize) -> usize; @@ -96,9 +96,9 @@ pub struct LruReader { meta_cache: LruMetaCache, } -impl MetaCache for LruMetaCache { - fn get(&self, gen: &Ulid) -> Option> { - self.0.get(gen, |v| v.map(Arc::clone)) +impl TonboCache> for LruMetaCache { + async fn get(&self, gen: &Ulid) -> Result>, CacheError> { + Ok(self.0.get(gen, |v| v.map(Arc::clone))) } fn insert(&self, gen: Ulid, data: Arc) -> Arc { @@ -107,7 +107,7 @@ impl MetaCache for LruMetaCache { } } -impl RangeCache for LruRangeCache { +impl TonboCache<(Ulid, Range), Bytes> for LruRangeCache { async fn get(&self, key: &(Ulid, Range)) -> Result, CacheError> { Ok(self.0.get(key, |v| v.cloned())) } @@ -139,7 +139,12 @@ impl AsyncFileReader for LruReader { fn get_metadata(&mut self) -> BoxFuture<'_, parquet::errors::Result>> { async move { - if let Some(meta) = self.meta_cache.get(&self.gen) { + if let Some(meta) = self + .meta_cache + .get(&self.gen) + .await + .map_err(|e| parquet::errors::ParquetError::External(From::from(e)))? + { return Ok(meta); }