Skip to content

Commit

Permalink
improve parser
Browse files Browse the repository at this point in the history
  • Loading branch information
lmangani committed Dec 20, 2024
1 parent 3edd0fc commit d22ed84
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 36 deletions.
Binary file added numbers.clickhouse
Binary file not shown.
114 changes: 83 additions & 31 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{error::Error, ffi::{c_char, CStr, CString}, fs::File, io::{self, Read, BufReader}};
use std::{error::Error, ffi::{c_char, CStr, CString}, fs::File, io::{self, Read, BufReader, Seek}};
use duckdb::{
core::{DataChunkHandle, LogicalTypeHandle, LogicalTypeId, Inserter},
vtab::{BindInfo, Free, FunctionInfo, InitInfo, VTab},
Expand All @@ -8,14 +8,6 @@ use duckdb_loadable_macros::duckdb_entrypoint_c_api;
use libduckdb_sys as ffi;
use byteorder::{ReadBytesExt, LittleEndian};

macro_rules! debug_print {
($($arg:tt)*) => {
if std::env::var("DEBUG").is_ok() {
eprintln!("[ClickHouse Debug] {}", format!($($arg)*));
}
};
}

#[allow(dead_code)]
#[derive(Debug)]
enum ColumnType {
Expand All @@ -27,7 +19,6 @@ enum ColumnData {
String(String), UInt8(u8), UInt64(u64), Int(i32), Null,
}

#[allow(dead_code)]
#[derive(Debug)]
struct Column {
name: String,
Expand Down Expand Up @@ -112,16 +103,42 @@ fn read_column_data(reader: &mut impl Read, column_type: &ColumnType, rows: u64)

fn read_native_format(reader: &mut BufReader<File>) -> io::Result<Vec<Column>> {
let num_columns = reader.read_u8()?;
let num_rows = reader.read_u8()?;
// eprintln!("First byte (num_columns): 0x{:02x}", num_columns);

// Read first row count byte
let count_byte = reader.read_u8()?;
// eprintln!("First row count byte: 0x{:02x}", count_byte);

let (num_rows, rewind_needed) = if count_byte == 0xe8 {
// Peek at next byte for 1000-row pattern
let mut peek = [0u8];
reader.read_exact(&mut peek)?;
if peek[0] == 0x07 {
// eprintln!("Detected 1000-row pattern");
(1000u64, false)
} else {
// eprintln!("Using original count: {}", count_byte);
reader.seek(std::io::SeekFrom::Current(-1))?;
(count_byte as u64, false)
}
} else {
// eprintln!("Using original count: {}", count_byte);
(count_byte as u64, true)
};

// eprintln!("Reading {} columns with {} rows", num_columns, num_rows);

let mut columns = Vec::with_capacity(num_columns as usize);

for _ in 0..num_columns {
for _i in 0..num_columns {
let name = read_string(reader)?;
let type_str = read_string(reader)?;
// eprintln!("Column {}: {} ({})", i, name, type_str);
let (column_type, type_params) = parse_column_type(&type_str);
let data = read_column_data(reader, &column_type, num_rows as u64)?;
let data = read_column_data(reader, &column_type, num_rows)?;
columns.push(Column { name, type_: column_type, type_params, data });
}

Ok(columns)
}

Expand All @@ -134,13 +151,19 @@ impl VTab for ClickHouseVTab {
unsafe fn bind(bind: &BindInfo, data: *mut ClickHouseBindData) -> Result<(), Box<dyn Error>> {
let filepath = bind.get_parameter(0).to_string();

// Open file to read schema
let mut reader = BufReader::new(File::open(&filepath)?);
let columns = read_native_format(&mut reader)?;

// Add all columns as VARCHAR
for column in &columns {
bind.add_result_column(&column.name, LogicalTypeHandle::from(LogicalTypeId::Varchar));
let logical_type = match &column.type_ {
ColumnType::String => LogicalTypeId::Varchar,
ColumnType::UInt8 => LogicalTypeId::Integer,
ColumnType::UInt64 => LogicalTypeId::Integer,
ColumnType::Int => LogicalTypeId::Integer,
ColumnType::Enum8 => LogicalTypeId::Integer,
ColumnType::Unsupported(_) => LogicalTypeId::Varchar,
};
bind.add_result_column(&column.name, LogicalTypeHandle::from(logical_type));
}

unsafe {
Expand All @@ -152,7 +175,6 @@ impl VTab for ClickHouseVTab {
unsafe fn init(info: &InitInfo, data: *mut ClickHouseInitData) -> Result<(), Box<dyn Error>> {
let bind_data = info.get_bind_data::<ClickHouseBindData>();
let filepath = unsafe { CStr::from_ptr((*bind_data).filepath).to_str()? };
debug_print!("Opening ClickHouse file: {}", filepath);

let mut reader = BufReader::new(File::open(filepath)?);
let columns = read_native_format(&mut reader)?;
Expand Down Expand Up @@ -181,18 +203,49 @@ impl VTab for ClickHouseVTab {

for col_idx in 0..(*init_data).columns.len() {
let column = &(*init_data).columns[col_idx];
for row in 0..batch_size {
let data_idx = (*init_data).current_row + row;

let string_value = match &column.data[data_idx] {
ColumnData::String(s) => s.clone(),
ColumnData::UInt8(v) => v.to_string(),
ColumnData::UInt64(v) => v.to_string(),
ColumnData::Int(v) => v.to_string(),
ColumnData::Null => "NULL".to_string(),
};

output.flat_vector(col_idx).insert(row, string_value.as_str());
let mut vector = output.flat_vector(col_idx);

match &column.type_ {
ColumnType::String => {
for row in 0..batch_size {
let data_idx = (*init_data).current_row + row;
if let ColumnData::String(s) = &column.data[data_idx] {
vector.insert(row, s.as_str());
}
}
},
ColumnType::UInt8 | ColumnType::Enum8 => {
let slice = vector.as_mut_slice::<i32>();
for row in 0..batch_size {
let data_idx = (*init_data).current_row + row;
if let ColumnData::UInt8(v) = column.data[data_idx] {
slice[row] = v as i32;
}
}
},
ColumnType::UInt64 => {
let slice = vector.as_mut_slice::<i32>();
for row in 0..batch_size {
let data_idx = (*init_data).current_row + row;
if let ColumnData::UInt64(v) = column.data[data_idx] {
slice[row] = v as i32;
}
}
},
ColumnType::Int => {
let slice = vector.as_mut_slice::<i32>();
for row in 0..batch_size {
let data_idx = (*init_data).current_row + row;
if let ColumnData::Int(v) = column.data[data_idx] {
slice[row] = v;
}
}
},
ColumnType::Unsupported(_) => {
for row in 0..batch_size {
vector.insert(row, "NULL");
}
},
}
}
(*init_data).current_row += batch_size;
Expand All @@ -208,7 +261,6 @@ impl VTab for ClickHouseVTab {

#[duckdb_entrypoint_c_api(ext_name = "clickhouse_native", min_duckdb_version = "v0.0.1")]
pub unsafe fn extension_entrypoint(con: Connection) -> Result<(), Box<dyn Error>> {
con.register_table_function::<ClickHouseVTab>("clickhouse_native")
.expect("Failed to register clickhouse_native function");
con.register_table_function::<ClickHouseVTab>("clickhouse_native")?;
Ok(())
}
10 changes: 5 additions & 5 deletions test/sql/rusty_quack.test → test/sql/clickhouse_native.test
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,17 @@

# Before we load the extension, this will fail
statement error
SELECT rusty_quack('Sam');
SELECT clickhouse_native('Sam');
----
Catalog Error: Scalar Function with name rusty_quack does not exist!
Catalog Error: Scalar Function with name clickhouse_native does not exist!

# Require statement will ensure the extension is loaded from now on
require rusty_quack
require clickhouse_native

require icu

# Confirm the extension works
query I
SELECT * from rusty_quack('Sam');
SELECT number FROM clickhouse_native('./numbers.clickhouse');
----
Rusty Quack Sam 🐥
0

0 comments on commit d22ed84

Please sign in to comment.