From 851e10f669778a4ec8155d513d4bee4db8ddaaa6 Mon Sep 17 00:00:00 2001 From: lmangani Date: Wed, 1 Jan 2025 15:16:11 +0000 Subject: [PATCH] working native driver baseline --- Cargo.lock | 7 +- Cargo.toml | 1 + src/clickhouse_scan.rs | 535 +++++++++++++++++++++++++++-------------- 3 files changed, 366 insertions(+), 177 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7435f26..6e8daa4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -381,13 +381,15 @@ checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" [[package]] name = "chrono" -version = "0.4.38" +version = "0.4.39" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a21f936df1771bf62b77f047b726c4625ff2e8aa607c01ec06e5a05bd8463401" +checksum = "7e36cc9d416881d2e24f9a963be5fb1cd90966419ac844274161d10488b3e825" dependencies = [ "android-tzdata", "iana-time-zone", + "js-sys", "num-traits", + "wasm-bindgen", "windows-targets", ] @@ -418,6 +420,7 @@ name = "chsql_native" version = "0.1.0" dependencies = [ "byteorder", + "chrono", "clickhouse-rs", "duckdb", "duckdb-loadable-macros", diff --git a/Cargo.toml b/Cargo.toml index 2bfad2d..59450a1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,3 +29,4 @@ serde_json = "1.0.134" tokio = { version = "1.42.0", features = ["rt", "rt-multi-thread"] } serde = "1.0.217" clickhouse-rs = "=1.1.0-alpha.1" +chrono = "0.4.39" diff --git a/src/clickhouse_scan.rs b/src/clickhouse_scan.rs index cc3bb50..ec8bf93 100644 --- a/src/clickhouse_scan.rs +++ b/src/clickhouse_scan.rs @@ -6,6 +6,7 @@ use duckdb::{ }; use clickhouse_rs::{Pool, types::SqlType}; use tokio::runtime::Runtime; +use std::ptr; #[repr(C)] struct ClickHouseScanBindData { @@ -13,22 +14,31 @@ struct ClickHouseScanBindData { user: String, password: String, query: String, + column_names: Vec, + column_types: Vec, } impl Drop for ClickHouseScanBindData { - fn drop(&mut self) {} + fn drop(&mut self) { + self.column_names.clear(); + self.column_types.clear(); + } } impl Free for ClickHouseScanBindData { - fn free(&mut self) {} + fn free(&mut self) { + // Explicitly clear vectors to ensure proper cleanup + self.column_names.clear(); + self.column_types.clear(); + } } #[repr(C)] struct ClickHouseScanInitData { runtime: Option>, block_data: Option>>, - column_types: Option>, - column_names: Option>, + column_types: Vec, + column_names: Vec, current_row: usize, total_rows: usize, done: bool, @@ -39,8 +49,6 @@ impl Drop for ClickHouseScanInitData { self.done = true; self.runtime.take(); self.block_data.take(); - self.column_types.take(); - self.column_names.take(); } } @@ -53,6 +61,7 @@ impl Free for ClickHouseScanInitData { } fn map_clickhouse_type(sql_type: SqlType) -> LogicalTypeId { + // println!("Mapping SQL type: {:?}", sql_type); // Debug print match sql_type { SqlType::Int8 | SqlType::Int16 | SqlType::Int32 => LogicalTypeId::Integer, SqlType::Int64 => LogicalTypeId::Bigint, @@ -64,7 +73,8 @@ fn map_clickhouse_type(sql_type: SqlType) -> LogicalTypeId { SqlType::Date => LogicalTypeId::Date, SqlType::DateTime(_) => LogicalTypeId::Timestamp, SqlType::Bool => LogicalTypeId::Boolean, - _ => LogicalTypeId::Varchar, + // Default to Integer for numeric literals + _ => LogicalTypeId::Integer } } @@ -74,199 +84,374 @@ impl VTab for ClickHouseScanVTab { type InitData = ClickHouseScanInitData; type BindData = ClickHouseScanBindData; - unsafe fn bind(bind: &BindInfo, data: *mut Self::BindData) -> Result<(), Box> { - if data.is_null() { - return Err("Invalid bind data pointer".into()); + // patch +unsafe fn bind(bind: &BindInfo, data: *mut Self::BindData) -> Result<(), Box> { + if data.is_null() { + return Err("Invalid bind data pointer".into()); + } + + let query = bind.get_parameter(0).to_string(); + let url = bind.get_named_parameter("url") + .map(|v| v.to_string()) + .unwrap_or_else(|| std::env::var("CLICKHOUSE_URL") + .unwrap_or_else(|_| "tcp://localhost:9000".to_string())); + let user = bind.get_named_parameter("user") + .map(|v| v.to_string()) + .unwrap_or_else(|| std::env::var("CLICKHOUSE_USER") + .unwrap_or_else(|_| "default".to_string())); + let password = bind.get_named_parameter("password") + .map(|v| v.to_string()) + .unwrap_or_else(|| std::env::var("CLICKHOUSE_PASSWORD") + .unwrap_or_default()); + + // println!("Parameters - URL: {}, User: {}, Query: {}", url, user, query); + + let runtime = Arc::new(Runtime::new()?); + + let result = runtime.block_on(async { + let pool = Pool::new(url.clone()); + let mut client = pool.get_handle().await?; + let block = client.query(&query).fetch_all().await?; + + let columns = block.columns(); + let mut names = Vec::new(); + let mut types = Vec::new(); + + for col in columns { + names.push(col.name().to_string()); + types.push(map_clickhouse_type(col.sql_type())); } - let query = bind.get_parameter(0).to_string(); - let url = bind.get_named_parameter("url") - .map(|v| v.to_string()) - .unwrap_or_else(|| std::env::var("CLICKHOUSE_URL") - .unwrap_or_else(|_| "tcp://localhost:9000".to_string())); - let user = bind.get_named_parameter("user") - .map(|v| v.to_string()) - .unwrap_or_else(|| std::env::var("CLICKHOUSE_USER") - .unwrap_or_else(|_| "default".to_string())); - let password = bind.get_named_parameter("password") - .map(|v| v.to_string()) - .unwrap_or_else(|| std::env::var("CLICKHOUSE_PASSWORD") - .unwrap_or_default()); - - println!("Parameters - URL: {}, User: {}, Query: {}", url, user, query); - - unsafe { - (*data) = ClickHouseScanBindData { - url, - user, - password, - query, - }; + Ok::<(Vec, Vec), Box>((names, types)) + })?; + + let (names, types) = result; + + // Create a new vector by recreating LogicalTypeId values + let types_for_iteration: Vec = types.iter().map(|type_id| { + match type_id { + LogicalTypeId::Integer => LogicalTypeId::Integer, + LogicalTypeId::Bigint => LogicalTypeId::Bigint, + LogicalTypeId::UInteger => LogicalTypeId::UInteger, + LogicalTypeId::UBigint => LogicalTypeId::UBigint, + LogicalTypeId::Float => LogicalTypeId::Float, + LogicalTypeId::Double => LogicalTypeId::Double, + LogicalTypeId::Varchar => LogicalTypeId::Varchar, + LogicalTypeId::Date => LogicalTypeId::Date, + LogicalTypeId::Timestamp => LogicalTypeId::Timestamp, + LogicalTypeId::Boolean => LogicalTypeId::Boolean, + _ => LogicalTypeId::Varchar, } + }).collect(); + + // Create bind data + let bind_data = ClickHouseScanBindData { + url, + user, + password, + query, + column_names: names.clone(), + column_types: types, + }; + + // Add result columns before storing the bind data + for (name, type_id) in names.iter().zip(types_for_iteration.iter()) { + let type_handle = LogicalTypeHandle::from(match type_id { + LogicalTypeId::Integer => LogicalTypeId::Integer, + LogicalTypeId::Bigint => LogicalTypeId::Bigint, + LogicalTypeId::UInteger => LogicalTypeId::UInteger, + LogicalTypeId::UBigint => LogicalTypeId::UBigint, + LogicalTypeId::Float => LogicalTypeId::Float, + LogicalTypeId::Double => LogicalTypeId::Double, + LogicalTypeId::Varchar => LogicalTypeId::Varchar, + LogicalTypeId::Date => LogicalTypeId::Date, + LogicalTypeId::Timestamp => LogicalTypeId::Timestamp, + LogicalTypeId::Boolean => LogicalTypeId::Boolean, + _ => LogicalTypeId::Varchar, + }); + bind.add_result_column(name, type_handle); + } - bind.add_result_column("version", LogicalTypeHandle::from(LogicalTypeId::Varchar)); - - Ok(()) + // Store the bind data after adding columns + unsafe { + ptr::write(data, bind_data); } - unsafe fn init(info: &InitInfo, data: *mut Self::InitData) -> Result<(), Box> { - if data.is_null() { - return Err("Invalid init data pointer".into()); - } + Ok(()) +} - let bind_data = info.get_bind_data::(); - if bind_data.is_null() { - return Err("Invalid bind data".into()); - } +unsafe fn init(info: &InitInfo, data: *mut Self::InitData) -> Result<(), Box> { + if data.is_null() { + return Err("Invalid init data pointer".into()); + } - let runtime = Arc::new(Runtime::new()?); - let runtime_clone = runtime.clone(); - - let result = runtime.block_on(async { - let pool = Pool::new((*bind_data).url.clone()); - let mut client = pool.get_handle().await?; - let block = client.query(&(*bind_data).query).fetch_all().await?; - - let columns = block.columns(); - let mut names = Vec::new(); - let mut types = Vec::new(); - let mut data: Vec> = Vec::new(); - - for col in columns { - names.push(col.name().to_string()); - types.push(map_clickhouse_type(col.sql_type())); - data.push(Vec::new()); - } + let bind_data = info.get_bind_data::(); + if bind_data.is_null() { + return Err("Invalid bind data".into()); + } - let mut row_count = 0; - for row in block.rows() { - for (col_idx, col) in columns.iter().enumerate() { - let value = match col.sql_type() { - SqlType::Int8 | SqlType::Int16 | SqlType::Int32 => { - match row.get::(col.name()) { - Ok(val) => val.to_string(), - Err(_) => "NULL".to_string() - } - }, - SqlType::Int64 => { - match row.get::(col.name()) { - Ok(val) => val.to_string(), - Err(_) => "NULL".to_string() - } - }, - SqlType::UInt8 | SqlType::UInt16 | SqlType::UInt32 => { - match row.get::(col.name()) { - Ok(val) => val.to_string(), - Err(_) => "NULL".to_string() - } - }, - SqlType::UInt64 => { - match row.get::(col.name()) { - Ok(val) => val.to_string(), - Err(_) => "NULL".to_string() - } - }, - SqlType::Float32 => { - match row.get::(col.name()) { - Ok(val) => val.to_string(), - Err(_) => "NULL".to_string() - } - }, - SqlType::Float64 => { - match row.get::(col.name()) { - Ok(val) => val.to_string(), - Err(_) => "NULL".to_string() - } - }, - _ => { - match row.get::(col.name()) { - Ok(val) => val, - Err(_) => "NULL".to_string() - } + let runtime = Arc::new(Runtime::new()?); + + let result = runtime.block_on(async { + let pool = Pool::new((*bind_data).url.clone()); + let mut client = pool.get_handle().await?; + let block = client.query(&(*bind_data).query).fetch_all().await?; + + let columns = block.columns(); + let mut data: Vec> = Vec::new(); + + for _ in columns { + data.push(Vec::new()); + } + + let mut row_count = 0; + for row in block.rows() { + for (col_idx, col) in columns.iter().enumerate() { + let value = match col.sql_type() { + SqlType::UInt8 => { + match row.get::(col.name()) { + Ok(val) => val.to_string(), + Err(_) => "0".to_string() } - }; - data[col_idx].push(value); - } - row_count += 1; + }, + SqlType::UInt16 => { + match row.get::(col.name()) { + Ok(val) => val.to_string(), + Err(_) => "0".to_string() + } + }, + SqlType::UInt32 => { + match row.get::(col.name()) { + Ok(val) => val.to_string(), + Err(_) => "0".to_string() + } + }, + SqlType::UInt64 => { + match row.get::(col.name()) { + Ok(val) => val.to_string(), + Err(_) => "0".to_string() + } + }, + SqlType::Int8 => { + match row.get::(col.name()) { + Ok(val) => val.to_string(), + Err(_) => "0".to_string() + } + }, + SqlType::Int16 => { + match row.get::(col.name()) { + Ok(val) => val.to_string(), + Err(_) => "0".to_string() + } + }, + SqlType::Int32 => { + match row.get::(col.name()) { + Ok(val) => val.to_string(), + Err(_) => "0".to_string() + } + }, + SqlType::Int64 => { + match row.get::(col.name()) { + Ok(val) => val.to_string(), + Err(_) => "0".to_string() + } + }, + SqlType::Float32 => { + match row.get::(col.name()) { + Ok(val) => val.to_string(), + Err(_) => "0.0".to_string() + } + }, + SqlType::Float64 => { + match row.get::(col.name()) { + Ok(val) => val.to_string(), + Err(_) => "0.0".to_string() + } + }, + SqlType::String | SqlType::FixedString(_) => { + match row.get::(col.name()) { + Ok(val) => val, + Err(_) => String::new() + } + }, + SqlType::Bool => { + match row.get::(col.name()) { + Ok(val) => val.to_string(), + Err(_) => "false".to_string() + } + }, + SqlType::Date => { + match row.get::(col.name()) { + Ok(val) => val, + Err(_) => "1970-01-01".to_string() + } + }, + SqlType::DateTime(_) => { + match row.get::(col.name()) { + Ok(val) => val, + Err(_) => "1970-01-01 00:00:00".to_string() + } + }, + _ => { + match row.get::(col.name()) { + Ok(val) => val, + Err(_) => "0".to_string() + } + } + }; + data[col_idx].push(value); } - - Ok::<(Vec>, Vec, Vec, usize), Box>((data, names, types, row_count)) - })?; - - let (block_data, column_names, column_types, total_rows) = result; - - unsafe { - (*data) = ClickHouseScanInitData { - runtime: Some(runtime_clone), - block_data: Some(block_data), - column_types: Some(column_types), - column_names: Some(column_names), - current_row: 0, - total_rows, - done: false, - }; + row_count += 1; } - Ok(()) + Ok::<(Vec>, usize), Box>((data, row_count)) + })?; + + let (block_data, total_rows) = result; + + // Create new vectors by mapping over references + let column_types = unsafe { + (*bind_data).column_types.iter().map(|type_id| { + match type_id { + LogicalTypeId::Integer => LogicalTypeId::Integer, + LogicalTypeId::Bigint => LogicalTypeId::Bigint, + LogicalTypeId::UInteger => LogicalTypeId::UInteger, + LogicalTypeId::UBigint => LogicalTypeId::UBigint, + LogicalTypeId::Float => LogicalTypeId::Float, + LogicalTypeId::Double => LogicalTypeId::Double, + LogicalTypeId::Varchar => LogicalTypeId::Varchar, + LogicalTypeId::Date => LogicalTypeId::Date, + LogicalTypeId::Timestamp => LogicalTypeId::Timestamp, + LogicalTypeId::Boolean => LogicalTypeId::Boolean, + _ => LogicalTypeId::Varchar, + } + }).collect::>() + }; + + let column_names = unsafe { (*bind_data).column_names.clone() }; + + // Create init data using ptr::write + unsafe { + ptr::write(data, ClickHouseScanInitData { + runtime: Some(runtime), + block_data: Some(block_data), + column_types, + column_names, + current_row: 0, + total_rows, + done: false, + }); } + Ok(()) +} + + // end patch + unsafe fn func(func: &FunctionInfo, output: &mut DataChunkHandle) -> Result<(), Box> { - let init_data = func.get_init_data::(); - - if init_data.is_null() { - return Err("Invalid init data pointer".into()); - } + let init_data = func.get_init_data::(); + + if init_data.is_null() { + return Err("Invalid init data pointer".into()); + } - unsafe { - if (*init_data).done || (*init_data).current_row >= (*init_data).total_rows { - output.set_len(0); - (*init_data).done = true; - return Ok(()); - } + unsafe { + if (*init_data).done || (*init_data).current_row >= (*init_data).total_rows { + output.set_len(0); + (*init_data).done = true; + return Ok(()); + } - let block_data = match (*init_data).block_data.as_ref() { - Some(data) => data, - None => return Err("Block data is not available".into()), - }; - - let column_types = match (*init_data).column_types.as_ref() { - Some(types) => types, - None => return Err("Column types are not available".into()), - }; - - let batch_size = 1024.min((*init_data).total_rows - (*init_data).current_row); - - for col_idx in 0..column_types.len() { - let mut vector = output.flat_vector(col_idx); - let type_id = &column_types[col_idx]; - - match type_id { - LogicalTypeId::Integer | LogicalTypeId::UInteger => { - let slice = vector.as_mut_slice::(); - for row_offset in 0..batch_size { - let row_idx = (*init_data).current_row + row_offset; - if let Ok(val) = block_data[col_idx][row_idx].parse::() { - slice[row_offset] = val; - } else { - slice[row_offset] = 0; - } + let block_data = match (*init_data).block_data.as_ref() { + Some(data) => data, + None => return Err("Block data is not available".into()), + }; + + let column_types = &(*init_data).column_types; + + let batch_size = 1024.min((*init_data).total_rows - (*init_data).current_row); + + for col_idx in 0..column_types.len() { + let mut vector = output.flat_vector(col_idx); + let type_id = &column_types[col_idx]; + + match type_id { + LogicalTypeId::Integer | LogicalTypeId::UInteger => { + let slice = vector.as_mut_slice::(); + for row_offset in 0..batch_size { + let row_idx = (*init_data).current_row + row_offset; + let val_str = &block_data[col_idx][row_idx]; + // println!("Parsing value: {}", val_str); // Debug print + + // Try parsing with different number bases + let val = if let Ok(v) = val_str.parse::() { + v + } else if let Ok(v) = val_str.parse::() { + v as i32 + } else if let Ok(v) = i32::from_str_radix(val_str.trim(), 10) { + v + } else { + println!("Failed to parse: {}", val_str); // Debug print + 0 + }; + slice[row_offset] = val; + } + }, + LogicalTypeId::UInteger => { + let slice = vector.as_mut_slice::(); + for row_offset in 0..batch_size { + let row_idx = (*init_data).current_row + row_offset; + // Try parsing as different unsigned integer types + let val = if let Ok(v) = block_data[col_idx][row_idx].parse::() { + v as i32 + } else if let Ok(v) = block_data[col_idx][row_idx].parse::() { + v as i32 + } else if let Ok(v) = block_data[col_idx][row_idx].parse::() { + v as i32 + } else { + 0 + }; + slice[row_offset] = val; + } + }, + LogicalTypeId::Bigint => { + let slice = vector.as_mut_slice::(); + for row_offset in 0..batch_size { + let row_idx = (*init_data).current_row + row_offset; + if let Ok(val) = block_data[col_idx][row_idx].parse::() { + slice[row_offset] = val; + } else { + slice[row_offset] = 0; } - }, - _ => { - for row_offset in 0..batch_size { - let row_idx = (*init_data).current_row + row_offset; - let val = block_data[col_idx][row_idx].as_str(); - Inserter::insert(&mut vector, row_offset, val); + } + }, + LogicalTypeId::UBigint => { + let slice = vector.as_mut_slice::(); + for row_offset in 0..batch_size { + let row_idx = (*init_data).current_row + row_offset; + if let Ok(val) = block_data[col_idx][row_idx].parse::() { + slice[row_offset] = val as i64; + } else { + slice[row_offset] = 0; } } + }, + _ => { + for row_offset in 0..batch_size { + let row_idx = (*init_data).current_row + row_offset; + let val = block_data[col_idx][row_idx].as_str(); + Inserter::insert(&mut vector, row_offset, val); + } } } - - (*init_data).current_row += batch_size; - output.set_len(batch_size); } - Ok(()) + + (*init_data).current_row += batch_size; + output.set_len(batch_size); + } + Ok(()) } + // end func fn parameters() -> Option> { Some(vec![LogicalTypeHandle::from(LogicalTypeId::Varchar)])