Skip to content

Commit

Permalink
Fix missing shrink at the end of seqno
Browse files Browse the repository at this point in the history
  • Loading branch information
gostkin committed Oct 30, 2023
1 parent 572af17 commit f28fedc
Show file tree
Hide file tree
Showing 2 changed files with 178 additions and 4 deletions.
6 changes: 4 additions & 2 deletions fraos/src/growable_mmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,9 @@ impl GrowableMmap {
}
}

if file_length > 0 {
let upper_cap = existing_length.unwrap_or(file_length);
let upper_cap = existing_length.unwrap_or(file_length);

if upper_cap > 0 {
let mmap = SharedMmap::new(
unsafe { MmapOptions::new().offset(0).len(upper_cap).map(file) }
.map_err(|err| FraosError::MmapError(MmapError::Mmap(err)))?,
Expand Down Expand Up @@ -327,6 +328,7 @@ impl GrowableMmap {

fn create_mmap(&self, new_mmap_size: usize, offset: usize) -> Result<MmapMut, FraosError> {
if let Some(file) = &self.file {
// that fills the file with zeros
file.set_len((offset + new_mmap_size) as u64)
.map_err(|err| FraosError::FileError(FileError::Extend(err)))?;
unsafe {
Expand Down
176 changes: 174 additions & 2 deletions fraos/src/seqno.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,20 @@ impl SeqNoIndex {
///
/// * `path` - the path to the file. It will be created if not exists.
pub fn new(path: Option<PathBuf>, writable: bool) -> Result<Self, FraosError> {
Appender::new(path, None, writable).map(|inner| Self { inner })
let mut appender =
Appender::new(path.clone(), None, writable).map(|inner| Self { inner })?;
let (_, last_len) = match appender.last()? {
None => return Ok(appender),
Some(some) => some,
};

if last_len == 0 {
let actual_len = appender.find_actual_end()?;
appender = Appender::new(path, Some(2 * Self::SIZE_OF_USIZE * actual_len), writable)
.map(|inner| Self { inner })?;
}

Ok(appender)
}

/// Add records to index. This function will block if another write is still
Expand Down Expand Up @@ -54,6 +67,21 @@ impl SeqNoIndex {
Ok(Some(current_seqno))
}

pub fn get_length_at(&self, at: usize) -> Result<usize, FraosError> {
Ok(self
.get_offset_and_length(at)?
.ok_or(FraosError::IndexFileDamaged)?
.1)
}

#[allow(unused)]
pub fn get_offset_at(&self, at: usize) -> Result<usize, FraosError> {
Ok(self
.get_offset_and_length(at)?
.ok_or(FraosError::IndexFileDamaged)?
.0)
}

/// Get the location of a record with the given number.
pub fn get_offset_and_length(
&self,
Expand Down Expand Up @@ -114,12 +142,47 @@ impl SeqNoIndex {
pub(crate) fn mmaps_count(&self) -> Result<usize, FraosError> {
self.inner.mmaps_count()
}

pub(crate) fn find_actual_end(&self) -> Result<usize, FraosError> {
let mut start = 0;
let len = self.len();
let mut end = self.len();

// empty index was created or index is empty
if self.get_length_at(start)? == 0 || end == 0 {
return Ok(0);
}

// all elements are non-zero
if self.get_length_at(end.saturating_sub(1))? != 0 {
return Ok(end);
}

while start < len.saturating_sub(1) {
if self.get_length_at(start)? != 0 && self.get_length_at(start + 1)? == 0 {
return Ok(start + 1);
}
let mid = (start + end) / 2;
if self.get_length_at(mid)? == 0 {
end = mid;
} else {
start = mid;
}
}

Err(FraosError::IndexFileDamaged)
}
}

#[cfg(test)]
mod tests {
use super::SeqNoIndex;
use crate::FraosError;

use crate::{FileError, FraosError, MmapError};
use memmap2::{MmapMut, MmapOptions};
use std::fs::{File, OpenOptions};
use std::mem::size_of;
use std::path::PathBuf;

#[quickcheck]
fn test_read_write(records: Vec<(usize, usize)>) {
Expand Down Expand Up @@ -186,4 +249,113 @@ mod tests {
FraosError::EmptyRecordAppended
));
}

fn get_file(path: PathBuf, writable: bool) -> Result<File, FraosError> {
let mut options = OpenOptions::new();
options.read(true);
if writable {
options.write(true).create(true);
};

options
.open(&path)
.map_err(|err| FraosError::FileError(FileError::FileOpen(path.clone(), err)))
}

fn allocate_mmap(file: &File, size: usize) -> Result<MmapMut, FraosError> {
// that fills the file with zeros
file.set_len(size as u64)
.map_err(|err| FraosError::FileError(FileError::Extend(err)))?;
unsafe { MmapOptions::new().len(size).offset(0u64).map_mut(file) }
.map_err(|err| FraosError::MmapError(MmapError::Mmap(err)))
}

#[test]
fn check_index_recovery_zero_length() {
for i in 0..20 {
let tmp = tempfile::NamedTempFile::new().unwrap();

let file = get_file(tmp.path().to_path_buf(), true).unwrap();

if i != 0 {
let mmap = allocate_mmap(&file, size_of::<usize>() * i).unwrap();
mmap.flush().unwrap();
}

let index = SeqNoIndex::new(Some(tmp.path().to_path_buf()), true);
assert!(index.is_ok(), "can't create seqno index with {} usizes", i);
let index = index.unwrap();
assert!(index.is_empty());

index.append(&[(5000, 5), (5005, 6)]).unwrap();
drop(index);

let index = SeqNoIndex::new(Some(tmp.path().to_path_buf()), true);
assert!(
index.is_ok(),
"can't create seqno index with {} usizes after append",
i,
);
let index = index.unwrap();
assert_eq!(
index.len(),
2,
"seqno index should have len 2 after append at {}",
i
);
}
}

#[test]
fn check_index_recovery_non_zero_length() {
for (non_zeros, zeros) in [(2, 0), (100, 0), (2, 1), (2, 5), (2, 10), (258, 400)] {
let tmp = tempfile::NamedTempFile::new().unwrap();

let file = get_file(tmp.path().to_path_buf(), true).unwrap();

let mut mmap = allocate_mmap(&file, size_of::<usize>() * (non_zeros + zeros)).unwrap();
for i in 0..non_zeros {
mmap.as_mut()[i * size_of::<usize>()..(i + 1) * size_of::<usize>()]
.copy_from_slice(&i.to_le_bytes()[..]);
}
mmap.flush().unwrap();

let index = SeqNoIndex::new(Some(tmp.path().to_path_buf()), true);
assert!(
index.is_ok(),
"can't create seqno index with {} non zeros and {} zeros",
non_zeros,
zeros
);
let index = index.unwrap();
assert_eq!(
index.len(),
non_zeros / 2,
"seqno index with {} non zeros and {} zeros should have len {}",
non_zeros,
zeros,
non_zeros / 2
);

index.append(&[(5000, 5), (5005, 6)]).unwrap();
drop(index);

let index = SeqNoIndex::new(Some(tmp.path().to_path_buf()), true);
assert!(
index.is_ok(),
"can't create seqno index with {} non zeros and {} zeros after append",
non_zeros,
zeros
);
let index = index.unwrap();
assert_eq!(
index.len(),
non_zeros / 2 + 2,
"seqno index with {} non zeros and {} zeros should have len {} after append",
non_zeros,
zeros,
non_zeros / 2 + 2
);
}
}
}

0 comments on commit f28fedc

Please sign in to comment.