Skip to content

Commit

Permalink
add unstable entries size (#441)
Browse files Browse the repository at this point in the history
* add entries size

Signed-off-by: gengliqi <[email protected]>
  • Loading branch information
gengliqi authored Jun 10, 2021
1 parent 905e65d commit b90f0c9
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 6 deletions.
67 changes: 61 additions & 6 deletions src/log_unstable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
// limitations under the License.

use crate::eraftpb::{Entry, Snapshot};
use crate::util::entry_approximate_size;
use slog::Logger;

/// The `unstable.entries[i]` has raft log position `i+unstable.offset`.
Expand All @@ -31,6 +32,9 @@ pub struct Unstable {
/// All entries that have not yet been written to storage.
pub entries: Vec<Entry>,

/// The size of entries
pub entries_size: usize,

/// The offset from the vector index.
pub offset: u64,

Expand All @@ -45,6 +49,7 @@ impl Unstable {
offset,
snapshot: None,
entries: vec![],
entries_size: 0,
logger,
}
}
Expand Down Expand Up @@ -103,6 +108,7 @@ impl Unstable {
}
self.offset = entry.get_index() + 1;
self.entries.clear();
self.entries_size = 0;
} else {
fatal!(
self.logger,
Expand Down Expand Up @@ -137,6 +143,7 @@ impl Unstable {
/// From a given snapshot, restores the snapshot to self, but doesn't unpack.
pub fn restore(&mut self, snap: Snapshot) {
self.entries.clear();
self.entries_size = 0;
self.offset = snap.get_metadata().index + 1;
self.snapshot = Some(snap);
}
Expand All @@ -155,13 +162,21 @@ impl Unstable {
// portion, so set the offset and replace the entries
self.offset = after;
self.entries.clear();
self.entries_size = 0;
} else {
// truncate to after and copy to self.entries then append
let off = self.offset;
self.must_check_outofbounds(off, after);
for e in &self.entries[(after - off) as usize..] {
self.entries_size -= entry_approximate_size(e);
}
self.entries.truncate((after - off) as usize);
}
self.entries.extend_from_slice(ents);
self.entries_size += ents
.iter()
.map(|ent| entry_approximate_size(ent))
.sum::<usize>();
}

/// Returns a slice of entries between the high and low.
Expand Down Expand Up @@ -202,6 +217,7 @@ impl Unstable {
mod test {
use crate::eraftpb::{Entry, Snapshot, SnapshotMetadata};
use crate::log_unstable::Unstable;
use crate::util::entry_approximate_size;

fn new_entry(index: u64, term: u64) -> Entry {
let mut e = Entry::default();
Expand Down Expand Up @@ -231,9 +247,16 @@ mod test {
(None, 5, Some(new_snapshot(4, 1)), true, 5),
];

for (entries, offset, snapshot, wok, windex) in tests {
for (e, offset, snapshot, wok, windex) in tests {
let mut entries_size = 0;
let mut entries = vec![];
if let Some(entry) = e {
entries_size = entry_approximate_size(&entry);
entries = vec![entry];
}
let u = Unstable {
entries: entries.map_or(vec![], |entry| vec![entry]),
entries,
entries_size,
offset,
snapshot,
logger: crate::default_logger(),
Expand All @@ -258,9 +281,16 @@ mod test {
(None, 0, None, false, 0),
];

for (entries, offset, snapshot, wok, windex) in tests {
for (e, offset, snapshot, wok, windex) in tests {
let mut entries_size = 0;
let mut entries = vec![];
if let Some(entry) = e {
entries_size = entry_approximate_size(&entry);
entries = vec![entry];
}
let u = Unstable {
entries: entries.map_or(vec![], |entry| vec![entry]),
entries,
entries_size,
offset,
snapshot,
logger: crate::default_logger(),
Expand Down Expand Up @@ -319,9 +349,16 @@ mod test {
(None, 0, None, 5, false, 0),
];

for (entries, offset, snapshot, index, wok, wterm) in tests {
for (e, offset, snapshot, index, wok, wterm) in tests {
let mut entries_size = 0;
let mut entries = vec![];
if let Some(entry) = e {
entries_size = entry_approximate_size(&entry);
entries = vec![entry];
}
let u = Unstable {
entries: entries.map_or(vec![], |entry| vec![entry]),
entries,
entries_size,
offset,
snapshot,
logger: crate::default_logger(),
Expand All @@ -338,6 +375,7 @@ mod test {
fn test_restore() {
let mut u = Unstable {
entries: vec![new_entry(5, 1)],
entries_size: entry_approximate_size(&new_entry(5, 1)),
offset: 5,
snapshot: Some(new_snapshot(4, 1)),
logger: crate::default_logger(),
Expand All @@ -348,14 +386,20 @@ mod test {

assert_eq!(u.offset, s.get_metadata().index + 1);
assert!(u.entries.is_empty());
assert_eq!(u.entries_size, 0);
assert_eq!(u.snapshot.unwrap(), s);
}

#[test]
fn test_stable_snapshot_and_entries() {
let ents = vec![new_entry(5, 1), new_entry(5, 2), new_entry(6, 3)];
let entries_size = ents
.iter()
.map(|ent| entry_approximate_size(ent))
.sum::<usize>();
let mut u = Unstable {
entries: ents.clone(),
entries_size,
offset: 5,
snapshot: Some(new_snapshot(4, 1)),
logger: crate::default_logger(),
Expand All @@ -364,6 +408,7 @@ mod test {
u.stable_snap(4);
u.stable_entries(6, 3);
assert!(u.entries.is_empty());
assert_eq!(u.entries_size, 0);
assert_eq!(u.offset, 7);
}

Expand Down Expand Up @@ -422,15 +467,25 @@ mod test {
];

for (entries, offset, snapshot, to_append, woffset, wentries) in tests {
let entries_size = entries
.iter()
.map(|ent| entry_approximate_size(ent))
.sum::<usize>();
let mut u = Unstable {
entries,
entries_size,
offset,
snapshot,
logger: crate::default_logger(),
};
u.truncate_and_append(&to_append);
assert_eq!(u.offset, woffset);
assert_eq!(u.entries, wentries);
let entries_size = wentries
.iter()
.map(|ent| entry_approximate_size(ent))
.sum::<usize>();
assert_eq!(u.entries_size, entries_size);
}
}
}
22 changes: 22 additions & 0 deletions src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,3 +155,25 @@ impl<'a> Union<'a> {
self.first.len() + self.second.len() - self.second.intersection(&self.first).count()
}
}

/// Get the approximate size of entry
#[inline]
pub fn entry_approximate_size(e: &Entry) -> usize {
// message Entry {
// EntryType entry_type = 1;
// uint64 term = 2;
// uint64 index = 3;
// bytes data = 4;
// bytes context = 6;
// bool sync_log = 5;(Deprecated)
// }
// Each field has tag(1 byte) if it's not default value.
// Tips: x bytes can represent a value up to 1 << x*7 - 1,
// So 1 byte => 127, 2 bytes => 16383, 3 bytes => 2097151.
// If entry_type is normal(default), in general, the size should
// be tag(4) + term(1) + index(2) + data(2) + context(1) = 10.
// If entry_type is conf change, in general, the size should be
// tag(5) + entry_type(1) + term(1) + index(2) + data(1) + context(1) = 11.
// We choose 12 in case of large index or large data for normal entry.
e.data.len() + e.context.len() + 12
}

0 comments on commit b90f0c9

Please sign in to comment.