Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: replace log with fusio-log #266

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ version = "0.2.0"
msrv = "1.79.0"

[features]
aws = ["fusio-dispatch/aws", "fusio/aws"]
aws = ["fusio-dispatch/aws", "fusio/aws", "fusio-log/aws"]
bench = ["redb", "rocksdb", "sled"]
bytes = ["dep:bytes"]
datafusion = ["dep:async-trait", "dep:datafusion"]
Expand All @@ -24,6 +24,7 @@ object-store = ["fusio/object_store"]
opfs = [
"dep:wasm-bindgen-futures",
"fusio-dispatch/opfs",
"fusio-log/web",
"fusio-parquet/web",
"fusio/opfs",
]
Expand All @@ -33,6 +34,7 @@ rocksdb = ["dep:rocksdb"]
sled = ["dep:sled"]
tokio = [
"fusio-dispatch/tokio",
"fusio-log/tokio",
"fusio-parquet/tokio",
"fusio/tokio",
"parquet/default",
Expand Down Expand Up @@ -86,6 +88,7 @@ fusio = { version = "0.3.4", features = [
"fs",
] }
fusio-dispatch = "0.3.4"
fusio-log = {git = "https://github.com/tonbo-io/fusio-log", default-features = false, features = ["bytes"]}
fusio-parquet = "0.3.4"
futures-core = "0.3"
futures-io = "0.3"
Expand Down
1 change: 1 addition & 0 deletions bindings/python/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ impl From<DbError> for PyErr {
tonbo::DbError::Recover(err) => RecoverError::new_err(err.to_string()),
tonbo::DbError::WalWrite(err) => PyIOError::new_err(err.to_string()),
tonbo::DbError::ExceedsMaxLevel => ExceedsMaxLevelError::new_err("Exceeds max level"),
tonbo::DbError::Logger(err) => PyIOError::new_err(err.to_string()),
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,8 @@ where
Fusio(#[from] fusio::Error),
#[error("compaction version error: {0}")]
Version(#[from] VersionError<R>),
#[error("compaction logger error: {0}")]
Logger(#[from] fusio_log::error::LogError),
#[error("compaction channel is closed")]
ChannelClose,
#[error("database error: {0}")]
Expand Down
49 changes: 27 additions & 22 deletions src/inmem/mutable.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,25 @@
use std::{intrinsics::transmute, ops::Bound, sync::Arc};
use std::{ops::Bound, sync::Arc};

use async_lock::Mutex;
use crossbeam_skiplist::{
map::{Entry, Range},
SkipMap,
};
use fusio::{buffered::BufWriter, DynFs, DynWrite};
use fusio::DynFs;

use crate::{
fs::{generate_file_id, FileId, FileType},
fs::{generate_file_id, FileId},
inmem::immutable::Immutable,
record::{Key, KeyRef, Record, Schema},
record::{KeyRef, Record, Schema},
timestamp::{
timestamped::{Timestamped, TimestampedRef},
Timestamp, EPOCH,
},
trigger::Trigger,
wal::{log::LogType, WalFile},
wal::{
log::{Log, LogType},
WalFile,
},
DbError, DbOption,
};

Expand All @@ -36,7 +39,7 @@ where
R: Record,
{
pub(crate) data: SkipMap<Timestamped<<R::Schema as Schema>::Key>, Option<R>>,
wal: Option<Mutex<WalFile<Box<dyn DynWrite>, R>>>,
wal: Option<Mutex<WalFile<R>>>,
pub(crate) trigger: Arc<Box<dyn Trigger<R> + Send + Sync>>,

pub(super) schema: Arc<R::Schema>,
Expand All @@ -56,13 +59,15 @@ where
if option.use_wal {
let file_id = generate_file_id();

let file = Box::new(BufWriter::new(
fs.open_options(&option.wal_path(file_id), FileType::Wal.open_options(false))
.await?,
option.wal_buffer_size,
)) as Box<dyn DynWrite>;

wal = Some(Mutex::new(WalFile::new(file, file_id)));
wal = Some(Mutex::new(
WalFile::<R>::new(
fs.clone(),
option.wal_path(file_id),
option.wal_buffer_size,
file_id,
)
.await,
));
};

Ok(Self {
Expand Down Expand Up @@ -106,21 +111,18 @@ where
) -> Result<bool, DbError<R>> {
let timestamped_key = Timestamped::new(key, ts);

if let (Some(log_ty), Some(wal)) = (log_ty, &self.wal) {
let record_entry = Log::new(timestamped_key, value, log_ty);
if let (Some(_log_ty), Some(wal)) = (log_ty, &self.wal) {
let mut wal_guard = wal.lock().await;

wal_guard
.write(
log_ty,
timestamped_key.map(|key| unsafe { transmute(key.as_key_ref()) }),
value.as_ref().map(R::as_record_ref),
)
.write(&record_entry)
.await
.map_err(|e| DbError::WalWrite(Box::new(e)))?;
}

let is_exceeded = self.trigger.item(&value);
self.data.insert(timestamped_key, value);
let is_exceeded = self.trigger.item(&record_entry.value);
self.data.insert(record_entry.key, record_entry.value);

Ok(is_exceeded)
}
Expand Down Expand Up @@ -176,7 +178,10 @@ where

pub(crate) async fn into_immutable(
self,
) -> Result<(Option<FileId>, Immutable<<R::Schema as Schema>::Columns>), fusio::Error> {
) -> Result<
(Option<FileId>, Immutable<<R::Schema as Schema>::Columns>),
fusio_log::error::LogError,
> {
let mut file_id = None;

if let Some(wal) = self.wal {
Expand Down
89 changes: 46 additions & 43 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ mod ondisk;
pub mod option;
pub mod record;
mod scope;
pub mod serdes;
pub mod snapshot;
pub mod stream;
pub mod timestamp;
Expand All @@ -135,15 +134,14 @@ mod trigger;
mod version;
mod wal;

use std::{
collections::HashMap, io, io::Cursor, marker::PhantomData, mem, ops::Bound, pin::pin, sync::Arc,
};
use std::{collections::HashMap, io, marker::PhantomData, mem, ops::Bound, pin::pin, sync::Arc};

pub use arrow;
use async_lock::RwLock;
use async_stream::stream;
use flume::{bounded, Sender};
use fs::FileId;
use fusio_log::Decode;
use futures_core::Stream;
use futures_util::StreamExt;
use inmem::{immutable::Immutable, mutable::Mutable};
Expand All @@ -163,20 +161,19 @@ use tokio::sync::oneshot;
pub use tonbo_macros::{KeyAttributes, Record};
use tracing::error;
use transaction::{CommitError, Transaction, TransactionEntry};
use wal::log::Log;

pub use crate::option::*;
use crate::{
compaction::{CompactTask, CompactionError, Compactor},
executor::Executor,
fs::{manager::StoreManager, parse_file_id, FileType},
record::Schema,
serdes::Decode,
snapshot::Snapshot,
stream::{
mem_projection::MemProjectionStream, merge::MergeStream, package::PackageStream, Entry,
ScanStream,
},
timestamp::Timestamped,
trigger::{Trigger, TriggerFactory},
version::{cleaner::Cleaner, set::VersionSet, TransactionTs, Version, VersionError},
wal::{log::LogType, RecoverError, WalFile},
Expand Down Expand Up @@ -522,49 +519,53 @@ where
for wal_meta in wal_metas {
let wal_path = wal_meta.path;

let file = base_fs
.open_options(&wal_path, FileType::Wal.open_options(false))
.await?;
// SAFETY: wal_stream return only file name
let wal_id = parse_file_id(&wal_path, FileType::Wal)?.unwrap();
let mut wal = WalFile::new(Cursor::new(file), wal_id);
wal_ids.push(wal_id);

let mut recover_stream = pin!(wal.recover());
let mut recover_stream =
pin!(WalFile::<R>::recover(option.base_fs.clone(), wal_path).await);
while let Some(record) = recover_stream.next().await {
let (log_type, Timestamped { ts, value: key }, value_option) = record?;

let is_excess = match log_type {
LogType::Full => {
schema
.recover_append(key, version_set.increase_ts(), value_option)
.await?
}
LogType::First => {
transaction_map.insert(ts, vec![(key, value_option)]);
false
}
LogType::Middle => {
transaction_map
.get_mut(&ts)
.unwrap()
.push((key, value_option));
false
}
LogType::Last => {
let mut is_excess = false;
let mut records = transaction_map.remove(&ts).unwrap();
records.push((key, value_option));

let ts = version_set.increase_ts();
for (key, value_option) in records {
is_excess = schema.recover_append(key, ts, value_option).await?;
let record_batch = record?;

for entry in record_batch {
let Log {
key,
value,
log_type,
} = entry;
let ts = key.ts;
let key = key.value;

let is_excess = match log_type.unwrap() {
LogType::Full => {
schema
.recover_append(key, version_set.increase_ts(), value)
.await?
}
is_excess
LogType::First => {
transaction_map.insert(ts, vec![(key, value)]);
false
}
LogType::Middle => {
transaction_map.get_mut(&ts).unwrap().push((key, value));
false
}
LogType::Last => {
let mut is_excess = false;
let mut records = transaction_map.remove(&ts).unwrap();
records.push((key, value));

let ts = version_set.increase_ts();
for (key, value_option) in records {
is_excess = schema.recover_append(key, ts, value_option).await?;
}
is_excess
}
};
if is_excess {
let _ = schema.compaction_tx.try_send(CompactTask::Freeze);
}
};
if is_excess {
let _ = schema.compaction_tx.try_send(CompactTask::Freeze);
}
}
}
Expand Down Expand Up @@ -882,6 +883,8 @@ where
WalWrite(Box<dyn std::error::Error + Send + Sync + 'static>),
#[error("exceeds the maximum level(0-6)")]
ExceedsMaxLevel,
#[error("write log error: {0}")]
Logger(#[from] fusio_log::error::LogError),
}

type LockMap<K> = Arc<LockableHashMap<K, ()>>;
Expand Down Expand Up @@ -909,6 +912,7 @@ pub(crate) mod tests {
use flume::{bounded, Receiver};
use fusio::{disk::TokioFs, path::Path, DynFs, SeqRead, Write};
use fusio_dispatch::FsOptions;
use fusio_log::{Decode, Encode};
use futures::StreamExt;
use parquet::arrow::ProjectionMask;
use parquet_lru::NoCache;
Expand All @@ -926,7 +930,6 @@ pub(crate) mod tests {
Datatype, DynRecord, Key, RecordDecodeError, RecordEncodeError, RecordRef,
Schema as RecordSchema, Value,
},
serdes::{Decode, Encode},
trigger::{TriggerFactory, TriggerType},
version::{cleaner::Cleaner, set::tests::build_version_set, Version},
wal::log::LogType,
Expand Down
3 changes: 1 addition & 2 deletions src/record/key/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ mod str;
use std::{hash::Hash, sync::Arc};

use arrow::array::Datum;

use crate::serdes::{Decode, Encode};
use fusio_log::{Decode, Encode};

pub trait Key:
'static + Encode + Decode + Ord + Clone + Send + Sync + Hash + std::fmt::Debug
Expand Down
6 changes: 2 additions & 4 deletions src/record/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,14 @@ pub(crate) mod test;
use std::{error::Error, fmt::Debug, io, sync::Arc};

use arrow::{array::RecordBatch, datatypes::Schema as ArrowSchema};
use fusio_log::{Decode, Encode};
use internal::InternalRecordRef;
pub use key::{Key, KeyRef};
use parquet::{arrow::ProjectionMask, format::SortingColumn, schema::types::ColumnPath};
pub use runtime::*;
use thiserror::Error;

use crate::{
inmem::immutable::ArrowArrays,
serdes::{Decode, Encode},
};
use crate::inmem::immutable::ArrowArrays;

pub trait Schema: Debug + Send + Sync {
type Record: Record<Schema = Self>;
Expand Down
6 changes: 2 additions & 4 deletions src/record/runtime/record.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
use std::sync::Arc;

use fusio::SeqRead;
use fusio_log::{Decode, Encode};

use super::{schema::DynSchema, Datatype, DynRecordRef, Value};
use crate::{
record::{Record, RecordDecodeError},
serdes::{Decode, Encode},
};
use crate::record::{Record, RecordDecodeError};

#[derive(Debug)]
pub struct DynRecord {
Expand Down
2 changes: 1 addition & 1 deletion src/record/runtime/record_ref.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ use arrow::{
},
};
use fusio::Write;
use fusio_log::Encode;

use super::{Datatype, DynRecord, Value};
use crate::{
magic::USER_COLUMN_OFFSET,
record::{internal::InternalRecordRef, Key, Record, RecordEncodeError, RecordRef, Schema},
serdes::Encode,
};

#[derive(Clone)]
Expand Down
6 changes: 2 additions & 4 deletions src/record/runtime/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,10 @@ use arrow::{
datatypes::{DataType, Field},
};
use fusio::{SeqRead, Write};
use fusio_log::{Decode, DecodeError, Encode};

use super::Datatype;
use crate::{
record::{Key, KeyRef},
serdes::{option::DecodeError, Decode, Encode},
};
use crate::record::{Key, KeyRef};

#[derive(Debug, Clone)]
pub struct ValueDesc {
Expand Down
Loading
Loading