diff --git a/numbers.clickhouse b/numbers.clickhouse new file mode 100644 index 0000000..fb37e18 Binary files /dev/null and b/numbers.clickhouse differ diff --git a/src/lib.rs b/src/lib.rs index dab7487..5adeb2b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,4 @@ -use std::{error::Error, ffi::{c_char, CStr, CString}, fs::File, io::{self, Read, BufReader}}; +use std::{error::Error, ffi::{c_char, CStr, CString}, fs::File, io::{self, Read, BufReader, Seek}}; use duckdb::{ core::{DataChunkHandle, LogicalTypeHandle, LogicalTypeId, Inserter}, vtab::{BindInfo, Free, FunctionInfo, InitInfo, VTab}, @@ -8,14 +8,6 @@ use duckdb_loadable_macros::duckdb_entrypoint_c_api; use libduckdb_sys as ffi; use byteorder::{ReadBytesExt, LittleEndian}; -macro_rules! debug_print { - ($($arg:tt)*) => { - if std::env::var("DEBUG").is_ok() { - eprintln!("[ClickHouse Debug] {}", format!($($arg)*)); - } - }; -} - #[allow(dead_code)] #[derive(Debug)] enum ColumnType { @@ -27,7 +19,6 @@ enum ColumnData { String(String), UInt8(u8), UInt64(u64), Int(i32), Null, } -#[allow(dead_code)] #[derive(Debug)] struct Column { name: String, @@ -112,16 +103,42 @@ fn read_column_data(reader: &mut impl Read, column_type: &ColumnType, rows: u64) fn read_native_format(reader: &mut BufReader) -> io::Result> { let num_columns = reader.read_u8()?; - let num_rows = reader.read_u8()?; + // eprintln!("First byte (num_columns): 0x{:02x}", num_columns); + + // Read first row count byte + let count_byte = reader.read_u8()?; + // eprintln!("First row count byte: 0x{:02x}", count_byte); + + let (num_rows, rewind_needed) = if count_byte == 0xe8 { + // Peek at next byte for 1000-row pattern + let mut peek = [0u8]; + reader.read_exact(&mut peek)?; + if peek[0] == 0x07 { + // eprintln!("Detected 1000-row pattern"); + (1000u64, false) + } else { + // eprintln!("Using original count: {}", count_byte); + reader.seek(std::io::SeekFrom::Current(-1))?; + (count_byte as u64, false) + } + } else { + // eprintln!("Using original count: {}", count_byte); + (count_byte as u64, true) + }; + + // eprintln!("Reading {} columns with {} rows", num_columns, num_rows); + let mut columns = Vec::with_capacity(num_columns as usize); - for _ in 0..num_columns { + for _i in 0..num_columns { let name = read_string(reader)?; let type_str = read_string(reader)?; + // eprintln!("Column {}: {} ({})", i, name, type_str); let (column_type, type_params) = parse_column_type(&type_str); - let data = read_column_data(reader, &column_type, num_rows as u64)?; + let data = read_column_data(reader, &column_type, num_rows)?; columns.push(Column { name, type_: column_type, type_params, data }); } + Ok(columns) } @@ -134,13 +151,19 @@ impl VTab for ClickHouseVTab { unsafe fn bind(bind: &BindInfo, data: *mut ClickHouseBindData) -> Result<(), Box> { let filepath = bind.get_parameter(0).to_string(); - // Open file to read schema let mut reader = BufReader::new(File::open(&filepath)?); let columns = read_native_format(&mut reader)?; - // Add all columns as VARCHAR for column in &columns { - bind.add_result_column(&column.name, LogicalTypeHandle::from(LogicalTypeId::Varchar)); + let logical_type = match &column.type_ { + ColumnType::String => LogicalTypeId::Varchar, + ColumnType::UInt8 => LogicalTypeId::Integer, + ColumnType::UInt64 => LogicalTypeId::Integer, + ColumnType::Int => LogicalTypeId::Integer, + ColumnType::Enum8 => LogicalTypeId::Integer, + ColumnType::Unsupported(_) => LogicalTypeId::Varchar, + }; + bind.add_result_column(&column.name, LogicalTypeHandle::from(logical_type)); } unsafe { @@ -152,7 +175,6 @@ impl VTab for ClickHouseVTab { unsafe fn init(info: &InitInfo, data: *mut ClickHouseInitData) -> Result<(), Box> { let bind_data = info.get_bind_data::(); let filepath = unsafe { CStr::from_ptr((*bind_data).filepath).to_str()? }; - debug_print!("Opening ClickHouse file: {}", filepath); let mut reader = BufReader::new(File::open(filepath)?); let columns = read_native_format(&mut reader)?; @@ -181,18 +203,49 @@ impl VTab for ClickHouseVTab { for col_idx in 0..(*init_data).columns.len() { let column = &(*init_data).columns[col_idx]; - for row in 0..batch_size { - let data_idx = (*init_data).current_row + row; - - let string_value = match &column.data[data_idx] { - ColumnData::String(s) => s.clone(), - ColumnData::UInt8(v) => v.to_string(), - ColumnData::UInt64(v) => v.to_string(), - ColumnData::Int(v) => v.to_string(), - ColumnData::Null => "NULL".to_string(), - }; - - output.flat_vector(col_idx).insert(row, string_value.as_str()); + let mut vector = output.flat_vector(col_idx); + + match &column.type_ { + ColumnType::String => { + for row in 0..batch_size { + let data_idx = (*init_data).current_row + row; + if let ColumnData::String(s) = &column.data[data_idx] { + vector.insert(row, s.as_str()); + } + } + }, + ColumnType::UInt8 | ColumnType::Enum8 => { + let slice = vector.as_mut_slice::(); + for row in 0..batch_size { + let data_idx = (*init_data).current_row + row; + if let ColumnData::UInt8(v) = column.data[data_idx] { + slice[row] = v as i32; + } + } + }, + ColumnType::UInt64 => { + let slice = vector.as_mut_slice::(); + for row in 0..batch_size { + let data_idx = (*init_data).current_row + row; + if let ColumnData::UInt64(v) = column.data[data_idx] { + slice[row] = v as i32; + } + } + }, + ColumnType::Int => { + let slice = vector.as_mut_slice::(); + for row in 0..batch_size { + let data_idx = (*init_data).current_row + row; + if let ColumnData::Int(v) = column.data[data_idx] { + slice[row] = v; + } + } + }, + ColumnType::Unsupported(_) => { + for row in 0..batch_size { + vector.insert(row, "NULL"); + } + }, } } (*init_data).current_row += batch_size; @@ -208,7 +261,6 @@ impl VTab for ClickHouseVTab { #[duckdb_entrypoint_c_api(ext_name = "clickhouse_native", min_duckdb_version = "v0.0.1")] pub unsafe fn extension_entrypoint(con: Connection) -> Result<(), Box> { - con.register_table_function::("clickhouse_native") - .expect("Failed to register clickhouse_native function"); + con.register_table_function::("clickhouse_native")?; Ok(()) } diff --git a/test/sql/rusty_quack.test b/test/sql/clickhouse_native.test similarity index 59% rename from test/sql/rusty_quack.test rename to test/sql/clickhouse_native.test index 2842fc9..a4ee437 100644 --- a/test/sql/rusty_quack.test +++ b/test/sql/clickhouse_native.test @@ -4,17 +4,17 @@ # Before we load the extension, this will fail statement error -SELECT rusty_quack('Sam'); +SELECT clickhouse_native('Sam'); ---- -Catalog Error: Scalar Function with name rusty_quack does not exist! +Catalog Error: Scalar Function with name clickhouse_native does not exist! # Require statement will ensure the extension is loaded from now on -require rusty_quack +require clickhouse_native require icu # Confirm the extension works query I -SELECT * from rusty_quack('Sam'); +SELECT number FROM clickhouse_native('./numbers.clickhouse'); ---- -Rusty Quack Sam 🐥 \ No newline at end of file +0