Skip to content

Commit

Permalink
Extended balance_root to handle root underflow case
Browse files Browse the repository at this point in the history
Did some code refactoring.
  • Loading branch information
krishvishal committed Jan 5, 2025
1 parent fdbf62d commit a1bd833
Showing 1 changed file with 164 additions and 63 deletions.
227 changes: 164 additions & 63 deletions core/storage/btree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1191,83 +1191,184 @@ impl BTreeCursor {
}
}

/// Balance the root page.
/// This is done when the root page overflows, and we need to create a new root page.
/// See e.g. https://en.wikipedia.org/wiki/B-tree
fn balance_root(&mut self) {
/* todo: balance deeper, create child and copy contents of root there. Then split root */
/* if we are in root page then we just need to create a new root and push key there */
fn calculate_used_space(&self, page: &PageContent) -> usize {
let mut total_used = 0;
total_used += page.header_size();
let (_, cell_array_size) = page.cell_pointer_array_offset_and_size();
total_used += cell_array_size;
for cell_idx in 0..page.cell_count() {
let (_, cell_len) = page.cell_get_raw_region(
cell_idx,
self.payload_overflow_threshold_max(page.page_type()),
self.payload_overflow_threshold_min(page.page_type()),
self.usable_space(),
);
total_used += cell_len;
}
for overflow_cell in &page.overflow_cells {
total_used += overflow_cell.payload.len();
}
total_used -= page.num_frag_free_bytes() as usize;

let is_page_1 = {
let current_root = self.stack.top();
current_root.get().id == 1
};
total_used
}

fn estimate_cell_pointer_growth(&self) -> usize {
// In SQLite, we want to leave some space for future cell pointers
// Each cell pointer takes 2 bytes
const CELL_POINTER_SIZE: usize = 2;
const ESTIMATED_FUTURE_CELLS: usize = 4; // Leave room for a few more cells

CELL_POINTER_SIZE * ESTIMATED_FUTURE_CELLS
}

fn is_overflow(&self, page: &PageContent) -> bool {
if !page.overflow_cells.is_empty() {
return true;
}

let used_space = self.calculate_used_space(page);
let available_space = self.usable_space();

// Check if we're about to exceed usable space
// We also need some buffer space for future cell pointers
let required_space = used_space + self.estimate_cell_pointer_growth();
required_space > available_space
}

fn is_underflow(&self, page: &PageContent) -> bool {
// Root is special case - only underflow when empty with one child
let current_page = self.stack.top();
if current_page.get().id == self.root_page {
return page.cell_count() == 0 && !page.is_leaf();
}

let used_space = self.calculate_used_space(page);
let usable_space = self.usable_space();

// Underflow if used space is less than 50% of usable space
used_space < (usable_space / 2)
}

fn balance_root(&mut self) {
let current_root = self.stack.top();
let contents = current_root.get().contents.as_mut().unwrap();
let is_page_1 = current_root.get().id == 1;
let offset = if is_page_1 { DATABASE_HEADER_SIZE } else { 0 };
let new_root_page = self.allocate_page(PageType::TableInterior, offset);
{
let current_root = self.stack.top();
let current_root_contents = current_root.get().contents.as_ref().unwrap();

let new_root_page_id = new_root_page.get().id;
let new_root_page_contents = new_root_page.get().contents.as_mut().unwrap();
if is_page_1 {
// Copy header
let current_root_buf = current_root_contents.as_ptr();
let new_root_buf = new_root_page_contents.as_ptr();
new_root_buf[0..DATABASE_HEADER_SIZE]
.copy_from_slice(&current_root_buf[0..DATABASE_HEADER_SIZE]);
if self.is_underflow(contents) {
let child_page_id = contents.rightmost_pointer().unwrap();
let child_page = self
.pager
.read_page(child_page_id as usize)
.expect("This shouldn't have happened, child page not found");
let child_contents = child_page.get().contents.as_mut().unwrap();

let necessary_space = self.calculate_used_space(child_contents);
let available_space = if is_page_1 {
self.usable_space() - DATABASE_HEADER_SIZE
} else {
self.usable_space()
};

// If root can't consume child, leave it empty temporarily and handle it in balance_leaf step
if available_space < necessary_space {
return;
}

let grandchild_ptr = child_contents.rightmost_pointer();

let mut cells = Vec::new();
for idx in 0..child_contents.cell_count() {
let (start, len) = child_contents.cell_get_raw_region(
idx,
self.payload_overflow_threshold_max(child_contents.page_type()),
self.payload_overflow_threshold_min(child_contents.page_type()),
self.usable_space(),
);
let buf = child_contents.as_ptr();
cells.push(buf[start..start + len].to_vec());
}

// TODO: free the child page by adding it to the free list. Functionality has to be added to pager.rs

contents.write_u16(PAGE_HEADER_OFFSET_CELL_COUNT, 0);
contents.write_u16(PAGE_HEADER_OFFSET_FIRST_FREEBLOCK, 0);
if !contents.is_leaf() {
contents.write_u32(
PAGE_HEADER_OFFSET_RIGHTMOST_PTR,
grandchild_ptr.unwrap_or(0),
);
}

// Insert cells into root
for (idx, cell) in cells.iter().enumerate() {
self.insert_into_cell(contents, cell, idx);
}
// point new root right child to previous root
new_root_page_contents
.write_u32(PAGE_HEADER_OFFSET_RIGHTMOST_PTR, new_root_page_id as u32);
new_root_page_contents.write_u16(PAGE_HEADER_OFFSET_CELL_COUNT, 0);
}

/* swap splitted page buffer with new root buffer so we don't have to update page idx */
{
let (root_id, child_id, child) = {
let page_ref = self.stack.top();
let child = page_ref.clone();

// Swap the entire Page structs
std::mem::swap(&mut child.get().id, &mut new_root_page.get().id);
// TODO:: shift bytes by offset to left on child because now child has offset 100
// and header bytes
// Also change the offset of page
//
if is_page_1 {
// Remove header from child and set offset to 0
let contents = child.get().contents.as_mut().unwrap();
let (cell_pointer_offset, _) = contents.cell_pointer_array_offset_and_size();
// change cell pointers
for cell_idx in 0..contents.cell_count() {
let cell_pointer_offset = cell_pointer_offset + (2 * cell_idx) - offset;
let pc = contents.read_u16(cell_pointer_offset);
contents.write_u16(cell_pointer_offset, pc - offset as u16);
}
if self.is_overflow(contents) {
// Create new child page to hold current root contents
let child_page = self.allocate_page(contents.page_type(), 0);
let child_page_id = child_page.get().id;
let child_contents = child_page.get().contents.as_mut().unwrap();

// Move all cells and right pointer from root to child
let mut cells = Vec::new();
for idx in 0..contents.cell_count() {
let (start, len) = contents.cell_get_raw_region(
idx,
self.payload_overflow_threshold_max(contents.page_type()),
self.payload_overflow_threshold_min(contents.page_type()),
self.usable_space(),
);
let buf = contents.as_ptr();
cells.push(buf[start..start + len].to_vec());
}
let right_pointer = contents.rightmost_pointer();

contents.offset = 0;
let buf = contents.as_ptr();
buf.copy_within(DATABASE_HEADER_SIZE.., 0);
}
// Clear root and convert to interior node
contents.write_u16(PAGE_HEADER_OFFSET_CELL_COUNT, 0);
contents.write_u16(PAGE_HEADER_OFFSET_FIRST_FREEBLOCK, 0);
contents.write_u8(0, PageType::TableInterior as u8);

self.pager.add_dirty(new_root_page.get().id);
self.pager.add_dirty(child.get().id);
(new_root_page.get().id, child.get().id, child)
};
// Initialize child with root's old contents
for (idx, cell) in cells.iter().enumerate() {
self.insert_into_cell(child_contents, cell, idx);
}
if !child_contents.is_leaf() {
child_contents
.write_u32(PAGE_HEADER_OFFSET_RIGHTMOST_PTR, right_pointer.unwrap_or(0));
}

// Handle page 1 special case
if is_page_1 {
self.handle_page_one_special_case(child_contents, offset);
}

debug!("Balancing root. root={}, rightmost={}", root_id, child_id);
let root = new_root_page.clone();
self.pager.add_dirty(child_page_id);

self.root_page = root_id;
// Update page stack
self.stack.clear();
self.stack.push(root.clone());
self.stack.push(child.clone());
self.stack.push(current_root.clone());
self.stack.push(child_page.clone());
}
}

self.pager.put_loaded_page(root_id, root);
self.pager.put_loaded_page(child_id, child);
// Helper function to handle page 1's special case
fn handle_page_one_special_case(&self, contents: &mut PageContent, offset: usize) {
let (cell_pointer_offset, _) = contents.cell_pointer_array_offset_and_size();

// Update cell pointers to account for removed header
for cell_idx in 0..contents.cell_count() {
let cell_pointer_offset = cell_pointer_offset + (2 * cell_idx) - offset;
let pc = contents.read_u16(cell_pointer_offset);
contents.write_u16(cell_pointer_offset, pc - offset as u16);
}

contents.offset = 0;
let buf = contents.as_ptr();
buf.copy_within(DATABASE_HEADER_SIZE.., 0);
}

/// Allocate a new page to the btree via the pager.
Expand Down

0 comments on commit a1bd833

Please sign in to comment.