diff --git a/core/storage/btree.rs b/core/storage/btree.rs index 481866ee7..336a96a36 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -1191,83 +1191,184 @@ impl BTreeCursor { } } - /// Balance the root page. - /// This is done when the root page overflows, and we need to create a new root page. - /// See e.g. https://en.wikipedia.org/wiki/B-tree - fn balance_root(&mut self) { - /* todo: balance deeper, create child and copy contents of root there. Then split root */ - /* if we are in root page then we just need to create a new root and push key there */ + fn calculate_used_space(&self, page: &PageContent) -> usize { + let mut total_used = 0; + total_used += page.header_size(); + let (_, cell_array_size) = page.cell_pointer_array_offset_and_size(); + total_used += cell_array_size; + for cell_idx in 0..page.cell_count() { + let (_, cell_len) = page.cell_get_raw_region( + cell_idx, + self.payload_overflow_threshold_max(page.page_type()), + self.payload_overflow_threshold_min(page.page_type()), + self.usable_space(), + ); + total_used += cell_len; + } + for overflow_cell in &page.overflow_cells { + total_used += overflow_cell.payload.len(); + } + total_used -= page.num_frag_free_bytes() as usize; - let is_page_1 = { - let current_root = self.stack.top(); - current_root.get().id == 1 - }; + total_used + } + fn estimate_cell_pointer_growth(&self) -> usize { + // In SQLite, we want to leave some space for future cell pointers + // Each cell pointer takes 2 bytes + const CELL_POINTER_SIZE: usize = 2; + const ESTIMATED_FUTURE_CELLS: usize = 4; // Leave room for a few more cells + + CELL_POINTER_SIZE * ESTIMATED_FUTURE_CELLS + } + + fn is_overflow(&self, page: &PageContent) -> bool { + if !page.overflow_cells.is_empty() { + return true; + } + + let used_space = self.calculate_used_space(page); + let available_space = self.usable_space(); + + // Check if we're about to exceed usable space + // We also need some buffer space for future cell pointers + let required_space = used_space + self.estimate_cell_pointer_growth(); + required_space > available_space + } + + fn is_underflow(&self, page: &PageContent) -> bool { + // Root is special case - only underflow when empty with one child + let current_page = self.stack.top(); + if current_page.get().id == self.root_page { + return page.cell_count() == 0 && !page.is_leaf(); + } + + let used_space = self.calculate_used_space(page); + let usable_space = self.usable_space(); + + // Underflow if used space is less than 50% of usable space + used_space < (usable_space / 2) + } + + fn balance_root(&mut self) { + let current_root = self.stack.top(); + let contents = current_root.get().contents.as_mut().unwrap(); + let is_page_1 = current_root.get().id == 1; let offset = if is_page_1 { DATABASE_HEADER_SIZE } else { 0 }; - let new_root_page = self.allocate_page(PageType::TableInterior, offset); - { - let current_root = self.stack.top(); - let current_root_contents = current_root.get().contents.as_ref().unwrap(); - let new_root_page_id = new_root_page.get().id; - let new_root_page_contents = new_root_page.get().contents.as_mut().unwrap(); - if is_page_1 { - // Copy header - let current_root_buf = current_root_contents.as_ptr(); - let new_root_buf = new_root_page_contents.as_ptr(); - new_root_buf[0..DATABASE_HEADER_SIZE] - .copy_from_slice(¤t_root_buf[0..DATABASE_HEADER_SIZE]); + if self.is_underflow(contents) { + let child_page_id = contents.rightmost_pointer().unwrap(); + let child_page = self + .pager + .read_page(child_page_id as usize) + .expect("This shouldn't have happened, child page not found"); + let child_contents = child_page.get().contents.as_mut().unwrap(); + + let necessary_space = self.calculate_used_space(child_contents); + let available_space = if is_page_1 { + self.usable_space() - DATABASE_HEADER_SIZE + } else { + self.usable_space() + }; + + // If root can't consume child, leave it empty temporarily and handle it in balance_leaf step + if available_space < necessary_space { + return; + } + + let grandchild_ptr = child_contents.rightmost_pointer(); + + let mut cells = Vec::new(); + for idx in 0..child_contents.cell_count() { + let (start, len) = child_contents.cell_get_raw_region( + idx, + self.payload_overflow_threshold_max(child_contents.page_type()), + self.payload_overflow_threshold_min(child_contents.page_type()), + self.usable_space(), + ); + let buf = child_contents.as_ptr(); + cells.push(buf[start..start + len].to_vec()); + } + + // TODO: free the child page by adding it to the free list. Functionality has to be added to pager.rs + + contents.write_u16(PAGE_HEADER_OFFSET_CELL_COUNT, 0); + contents.write_u16(PAGE_HEADER_OFFSET_FIRST_FREEBLOCK, 0); + if !contents.is_leaf() { + contents.write_u32( + PAGE_HEADER_OFFSET_RIGHTMOST_PTR, + grandchild_ptr.unwrap_or(0), + ); + } + + // Insert cells into root + for (idx, cell) in cells.iter().enumerate() { + self.insert_into_cell(contents, cell, idx); } - // point new root right child to previous root - new_root_page_contents - .write_u32(PAGE_HEADER_OFFSET_RIGHTMOST_PTR, new_root_page_id as u32); - new_root_page_contents.write_u16(PAGE_HEADER_OFFSET_CELL_COUNT, 0); } - /* swap splitted page buffer with new root buffer so we don't have to update page idx */ - { - let (root_id, child_id, child) = { - let page_ref = self.stack.top(); - let child = page_ref.clone(); - - // Swap the entire Page structs - std::mem::swap(&mut child.get().id, &mut new_root_page.get().id); - // TODO:: shift bytes by offset to left on child because now child has offset 100 - // and header bytes - // Also change the offset of page - // - if is_page_1 { - // Remove header from child and set offset to 0 - let contents = child.get().contents.as_mut().unwrap(); - let (cell_pointer_offset, _) = contents.cell_pointer_array_offset_and_size(); - // change cell pointers - for cell_idx in 0..contents.cell_count() { - let cell_pointer_offset = cell_pointer_offset + (2 * cell_idx) - offset; - let pc = contents.read_u16(cell_pointer_offset); - contents.write_u16(cell_pointer_offset, pc - offset as u16); - } + if self.is_overflow(contents) { + // Create new child page to hold current root contents + let child_page = self.allocate_page(contents.page_type(), 0); + let child_page_id = child_page.get().id; + let child_contents = child_page.get().contents.as_mut().unwrap(); + + // Move all cells and right pointer from root to child + let mut cells = Vec::new(); + for idx in 0..contents.cell_count() { + let (start, len) = contents.cell_get_raw_region( + idx, + self.payload_overflow_threshold_max(contents.page_type()), + self.payload_overflow_threshold_min(contents.page_type()), + self.usable_space(), + ); + let buf = contents.as_ptr(); + cells.push(buf[start..start + len].to_vec()); + } + let right_pointer = contents.rightmost_pointer(); - contents.offset = 0; - let buf = contents.as_ptr(); - buf.copy_within(DATABASE_HEADER_SIZE.., 0); - } + // Clear root and convert to interior node + contents.write_u16(PAGE_HEADER_OFFSET_CELL_COUNT, 0); + contents.write_u16(PAGE_HEADER_OFFSET_FIRST_FREEBLOCK, 0); + contents.write_u8(0, PageType::TableInterior as u8); - self.pager.add_dirty(new_root_page.get().id); - self.pager.add_dirty(child.get().id); - (new_root_page.get().id, child.get().id, child) - }; + // Initialize child with root's old contents + for (idx, cell) in cells.iter().enumerate() { + self.insert_into_cell(child_contents, cell, idx); + } + if !child_contents.is_leaf() { + child_contents + .write_u32(PAGE_HEADER_OFFSET_RIGHTMOST_PTR, right_pointer.unwrap_or(0)); + } + + // Handle page 1 special case + if is_page_1 { + self.handle_page_one_special_case(child_contents, offset); + } - debug!("Balancing root. root={}, rightmost={}", root_id, child_id); - let root = new_root_page.clone(); + self.pager.add_dirty(child_page_id); - self.root_page = root_id; + // Update page stack self.stack.clear(); - self.stack.push(root.clone()); - self.stack.push(child.clone()); + self.stack.push(current_root.clone()); + self.stack.push(child_page.clone()); + } + } - self.pager.put_loaded_page(root_id, root); - self.pager.put_loaded_page(child_id, child); + // Helper function to handle page 1's special case + fn handle_page_one_special_case(&self, contents: &mut PageContent, offset: usize) { + let (cell_pointer_offset, _) = contents.cell_pointer_array_offset_and_size(); + + // Update cell pointers to account for removed header + for cell_idx in 0..contents.cell_count() { + let cell_pointer_offset = cell_pointer_offset + (2 * cell_idx) - offset; + let pc = contents.read_u16(cell_pointer_offset); + contents.write_u16(cell_pointer_offset, pc - offset as u16); } + + contents.offset = 0; + let buf = contents.as_ptr(); + buf.copy_within(DATABASE_HEADER_SIZE.., 0); } /// Allocate a new page to the btree via the pager.