Skip to content

Commit

Permalink
refactor: replace log with fusio-log (#266)
Browse files Browse the repository at this point in the history
* refactor: replace log with fusio-log

* fix: add missing pattern in python

* chore: update fusio deps

---------

Co-authored-by: Gwo Tzu-Hsing <[email protected]>
  • Loading branch information
crwen and ethe authored Jan 26, 2025
1 parent ccb79bb commit c85dc13
Show file tree
Hide file tree
Showing 31 changed files with 360 additions and 1,149 deletions.
11 changes: 7 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ version = "0.2.0"
msrv = "1.79.0"

[features]
aws = ["fusio-dispatch/aws", "fusio/aws"]
aws = ["fusio-dispatch/aws", "fusio/aws", "fusio-log/aws"]
bench = ["redb", "rocksdb", "sled"]
bytes = ["dep:bytes"]
datafusion = ["dep:async-trait", "dep:datafusion"]
Expand All @@ -24,6 +24,7 @@ object-store = ["fusio/object_store"]
opfs = [
"dep:wasm-bindgen-futures",
"fusio-dispatch/opfs",
"fusio-log/web",
"fusio-parquet/web",
"fusio/opfs",
]
Expand All @@ -33,6 +34,7 @@ rocksdb = ["dep:rocksdb"]
sled = ["dep:sled"]
tokio = [
"fusio-dispatch/tokio",
"fusio-log/tokio",
"fusio-parquet/tokio",
"fusio/tokio",
"parquet/default",
Expand Down Expand Up @@ -81,12 +83,13 @@ crc32fast = "1"
crossbeam-skiplist = "0.1"
datafusion = { version = "42.2.0", optional = true }
flume = { version = "0.11", features = ["async"] }
fusio = { version = "0.3.4", features = [
fusio = { git = "https://github.com/tonbo-io/fusio", rev = "1fb503916a4945e5ff5fcc136f3d65e56375fb3d", version = "0.3.4", package = "fusio", features = [
"dyn",
"fs",
] }
fusio-dispatch = "0.3.4"
fusio-parquet = "0.3.4"
fusio-dispatch = { git = "https://github.com/tonbo-io/fusio", rev = "1fb503916a4945e5ff5fcc136f3d65e56375fb3d", version = "0.3.4", package = "fusio-dispatch" }
fusio-log = { git = "https://github.com/tonbo-io/fusio", rev = "1fb503916a4945e5ff5fcc136f3d65e56375fb3d", version = "0.3.4", package = "fusio-log" , default-features = false, features = ["bytes"] }
fusio-parquet = { git = "https://github.com/tonbo-io/fusio", rev = "1fb503916a4945e5ff5fcc136f3d65e56375fb3d", version = "0.3.4", package = "fusio-parquet" }
futures-core = "0.3"
futures-io = "0.3"
futures-util = "0.3"
Expand Down
4 changes: 2 additions & 2 deletions bindings/python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ crate-type = ["cdylib"]
[workspace]

[dependencies]
fusio = { version = "0.3.4", features = [
fusio = { git = "https://github.com/tonbo-io/fusio", rev = "1fb503916a4945e5ff5fcc136f3d65e56375fb3d", version = "0.3.4", package = "fusio", features = [
"aws",
"tokio",
] }
fusio-dispatch = { version = "0.3.4", features = [
fusio-dispatch = { git = "https://github.com/tonbo-io/fusio", rev = "1fb503916a4945e5ff5fcc136f3d65e56375fb3d", version = "0.3.4", package = "fusio-dispatch", features = [
"aws",
"tokio",
] }
Expand Down
1 change: 1 addition & 0 deletions bindings/python/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ impl From<DbError> for PyErr {
tonbo::DbError::Recover(err) => RecoverError::new_err(err.to_string()),
tonbo::DbError::WalWrite(err) => PyIOError::new_err(err.to_string()),
tonbo::DbError::ExceedsMaxLevel => ExceedsMaxLevelError::new_err("Exceeds max level"),
tonbo::DbError::Logger(err) => PyIOError::new_err(err.to_string()),
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,8 @@ where
Fusio(#[from] fusio::Error),
#[error("compaction version error: {0}")]
Version(#[from] VersionError<R>),
#[error("compaction logger error: {0}")]
Logger(#[from] fusio_log::error::LogError),
#[error("compaction channel is closed")]
ChannelClose,
#[error("database error: {0}")]
Expand Down
50 changes: 27 additions & 23 deletions src/inmem/mutable.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,25 @@
use std::{intrinsics::transmute, ops::Bound, sync::Arc};
use std::{ops::Bound, sync::Arc};

use async_lock::Mutex;
use crossbeam_skiplist::{
map::{Entry, Range},
SkipMap,
};
use fusio::{buffered::BufWriter, DynFs, DynWrite};
use fusio::DynFs;

use crate::{
fs::{generate_file_id, FileId, FileType},
fs::{generate_file_id, FileId},
inmem::immutable::Immutable,
record::{Key, KeyRef, Record, Schema},
record::{KeyRef, Record, Schema},
timestamp::{
timestamped::{Timestamped, TimestampedRef},
Timestamp, EPOCH,
},
trigger::Trigger,
wal::{log::LogType, WalFile},
wal::{
log::{Log, LogType},
WalFile,
},
DbError, DbOption,
};

Expand All @@ -36,9 +39,8 @@ where
R: Record,
{
pub(crate) data: SkipMap<Timestamped<<R::Schema as Schema>::Key>, Option<R>>,
wal: Option<Mutex<WalFile<Box<dyn DynWrite>, R>>>,
wal: Option<Mutex<WalFile<R>>>,
pub(crate) trigger: Arc<dyn Trigger<R>>,

pub(super) schema: Arc<R::Schema>,
}

Expand All @@ -56,13 +58,15 @@ where
if option.use_wal {
let file_id = generate_file_id();

let file = Box::new(BufWriter::new(
fs.open_options(&option.wal_path(file_id), FileType::Wal.open_options(false))
.await?,
option.wal_buffer_size,
)) as Box<dyn DynWrite>;

wal = Some(Mutex::new(WalFile::new(file, file_id)));
wal = Some(Mutex::new(
WalFile::<R>::new(
fs.clone(),
option.wal_path(file_id),
option.wal_buffer_size,
file_id,
)
.await,
));
};

Ok(Self {
Expand Down Expand Up @@ -106,21 +110,18 @@ where
) -> Result<bool, DbError<R>> {
let timestamped_key = Timestamped::new(key, ts);

if let (Some(log_ty), Some(wal)) = (log_ty, &self.wal) {
let record_entry = Log::new(timestamped_key, value, log_ty);
if let (Some(_log_ty), Some(wal)) = (log_ty, &self.wal) {
let mut wal_guard = wal.lock().await;

wal_guard
.write(
log_ty,
timestamped_key.map(|key| unsafe { transmute(key.as_key_ref()) }),
value.as_ref().map(R::as_record_ref),
)
.write(&record_entry)
.await
.map_err(|e| DbError::WalWrite(Box::new(e)))?;
}

let is_exceeded = self.trigger.check_if_exceed(&value);
self.data.insert(timestamped_key, value);
let is_exceeded = self.trigger.check_if_exceed(&record_entry.value);
self.data.insert(record_entry.key, record_entry.value);

Ok(is_exceeded)
}
Expand Down Expand Up @@ -176,7 +177,10 @@ where

pub(crate) async fn into_immutable(
self,
) -> Result<(Option<FileId>, Immutable<<R::Schema as Schema>::Columns>), fusio::Error> {
) -> Result<
(Option<FileId>, Immutable<<R::Schema as Schema>::Columns>),
fusio_log::error::LogError,
> {
let mut file_id = None;

if let Some(wal) = self.wal {
Expand Down
89 changes: 46 additions & 43 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ mod ondisk;
pub mod option;
pub mod record;
mod scope;
pub mod serdes;
pub mod snapshot;
pub mod stream;
pub mod timestamp;
Expand All @@ -135,15 +134,14 @@ mod trigger;
mod version;
mod wal;

use std::{
collections::HashMap, io, io::Cursor, marker::PhantomData, mem, ops::Bound, pin::pin, sync::Arc,
};
use std::{collections::HashMap, io, marker::PhantomData, mem, ops::Bound, pin::pin, sync::Arc};

pub use arrow;
use async_lock::RwLock;
use async_stream::stream;
use flume::{bounded, Sender};
use fs::FileId;
use fusio_log::Decode;
use futures_core::Stream;
use futures_util::StreamExt;
use inmem::{immutable::Immutable, mutable::Mutable};
Expand All @@ -163,20 +161,19 @@ use tokio::sync::oneshot;
pub use tonbo_macros::{KeyAttributes, Record};
use tracing::error;
use transaction::{CommitError, Transaction, TransactionEntry};
use wal::log::Log;

pub use crate::option::*;
use crate::{
compaction::{CompactTask, CompactionError, Compactor},
executor::Executor,
fs::{manager::StoreManager, parse_file_id, FileType},
record::Schema,
serdes::Decode,
snapshot::Snapshot,
stream::{
mem_projection::MemProjectionStream, merge::MergeStream, package::PackageStream, Entry,
ScanStream,
},
timestamp::Timestamped,
trigger::{Trigger, TriggerFactory},
version::{cleaner::Cleaner, set::VersionSet, TransactionTs, Version, VersionError},
wal::{log::LogType, RecoverError, WalFile},
Expand Down Expand Up @@ -522,49 +519,53 @@ where
for wal_meta in wal_metas {
let wal_path = wal_meta.path;

let file = base_fs
.open_options(&wal_path, FileType::Wal.open_options(false))
.await?;
// SAFETY: wal_stream return only file name
let wal_id = parse_file_id(&wal_path, FileType::Wal)?.unwrap();
let mut wal = WalFile::new(Cursor::new(file), wal_id);
wal_ids.push(wal_id);

let mut recover_stream = pin!(wal.recover());
let mut recover_stream =
pin!(WalFile::<R>::recover(option.base_fs.clone(), wal_path).await);
while let Some(record) = recover_stream.next().await {
let (log_type, Timestamped { ts, value: key }, value_option) = record?;

let is_excess = match log_type {
LogType::Full => {
schema
.recover_append(key, version_set.increase_ts(), value_option)
.await?
}
LogType::First => {
transaction_map.insert(ts, vec![(key, value_option)]);
false
}
LogType::Middle => {
transaction_map
.get_mut(&ts)
.unwrap()
.push((key, value_option));
false
}
LogType::Last => {
let mut is_excess = false;
let mut records = transaction_map.remove(&ts).unwrap();
records.push((key, value_option));

let ts = version_set.increase_ts();
for (key, value_option) in records {
is_excess = schema.recover_append(key, ts, value_option).await?;
let record_batch = record?;

for entry in record_batch {
let Log {
key,
value,
log_type,
} = entry;
let ts = key.ts;
let key = key.value;

let is_excess = match log_type.unwrap() {
LogType::Full => {
schema
.recover_append(key, version_set.increase_ts(), value)
.await?
}
is_excess
LogType::First => {
transaction_map.insert(ts, vec![(key, value)]);
false
}
LogType::Middle => {
transaction_map.get_mut(&ts).unwrap().push((key, value));
false
}
LogType::Last => {
let mut is_excess = false;
let mut records = transaction_map.remove(&ts).unwrap();
records.push((key, value));

let ts = version_set.increase_ts();
for (key, value_option) in records {
is_excess = schema.recover_append(key, ts, value_option).await?;
}
is_excess
}
};
if is_excess {
let _ = schema.compaction_tx.try_send(CompactTask::Freeze);
}
};
if is_excess {
let _ = schema.compaction_tx.try_send(CompactTask::Freeze);
}
}
}
Expand Down Expand Up @@ -882,6 +883,8 @@ where
WalWrite(Box<dyn std::error::Error + Send + Sync + 'static>),
#[error("exceeds the maximum level(0-6)")]
ExceedsMaxLevel,
#[error("write log error: {0}")]
Logger(#[from] fusio_log::error::LogError),
}

type LockMap<K> = Arc<LockableHashMap<K, ()>>;
Expand Down Expand Up @@ -909,6 +912,7 @@ pub(crate) mod tests {
use flume::{bounded, Receiver};
use fusio::{disk::TokioFs, path::Path, DynFs, SeqRead, Write};
use fusio_dispatch::FsOptions;
use fusio_log::{Decode, Encode};
use futures::StreamExt;
use parquet::arrow::ProjectionMask;
use parquet_lru::NoCache;
Expand All @@ -926,7 +930,6 @@ pub(crate) mod tests {
Datatype, DynRecord, Key, RecordDecodeError, RecordEncodeError, RecordRef,
Schema as RecordSchema, Value,
},
serdes::{Decode, Encode},
trigger::{TriggerFactory, TriggerType},
version::{cleaner::Cleaner, set::tests::build_version_set, Version},
wal::log::LogType,
Expand Down
3 changes: 1 addition & 2 deletions src/record/key/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ mod str;
use std::{hash::Hash, sync::Arc};

use arrow::array::Datum;

use crate::serdes::{Decode, Encode};
use fusio_log::{Decode, Encode};

pub trait Key:
'static + Encode + Decode + Ord + Clone + Send + Sync + Hash + std::fmt::Debug
Expand Down
6 changes: 2 additions & 4 deletions src/record/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,14 @@ pub(crate) mod test;
use std::{error::Error, fmt::Debug, io, sync::Arc};

use arrow::{array::RecordBatch, datatypes::Schema as ArrowSchema};
use fusio_log::{Decode, Encode};
use internal::InternalRecordRef;
pub use key::{Key, KeyRef};
use parquet::{arrow::ProjectionMask, format::SortingColumn, schema::types::ColumnPath};
pub use runtime::*;
use thiserror::Error;

use crate::{
inmem::immutable::ArrowArrays,
serdes::{Decode, Encode},
};
use crate::inmem::immutable::ArrowArrays;

pub trait Schema: Debug + Send + Sync {
type Record: Record<Schema = Self>;
Expand Down
6 changes: 2 additions & 4 deletions src/record/runtime/record.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
use std::sync::Arc;

use fusio::SeqRead;
use fusio_log::{Decode, Encode};

use super::{schema::DynSchema, Datatype, DynRecordRef, Value};
use crate::{
record::{Record, RecordDecodeError},
serdes::{Decode, Encode},
};
use crate::record::{Record, RecordDecodeError};

#[derive(Debug)]
pub struct DynRecord {
Expand Down
2 changes: 1 addition & 1 deletion src/record/runtime/record_ref.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ use arrow::{
},
};
use fusio::Write;
use fusio_log::Encode;

use super::{Datatype, DynRecord, Value};
use crate::{
magic::USER_COLUMN_OFFSET,
record::{internal::InternalRecordRef, Key, Record, RecordEncodeError, RecordRef, Schema},
serdes::Encode,
};

#[derive(Clone)]
Expand Down
Loading

0 comments on commit c85dc13

Please sign in to comment.