diff --git a/src/lib.rs b/src/lib.rs index 6441e72..b4d5ec6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -58,7 +58,10 @@ fn read_string(reader: &mut impl Read) -> io::Result { let len = reader.read_u8()?; let mut buffer = vec![0; len as usize]; reader.read_exact(&mut buffer)?; - Ok(String::from_utf8_lossy(&buffer).into_owned()) + Ok(String::from_utf8_lossy(&buffer) + .replace('\0', "") + .replace('\u{FFFD}', "") + .to_string()) } fn parse_column_type(type_str: &str) -> (ColumnType, Option) { @@ -90,19 +93,18 @@ fn parse_column_type(type_str: &str) -> (ColumnType, Option) { fn read_column_data(reader: &mut impl Read, column_type: &ColumnType, rows: u64) -> io::Result> { let mut data = Vec::with_capacity(rows as usize); - for i in 0..rows { + for _ in 0..rows { let value = match column_type { ColumnType::UInt64 => { let val = reader.read_u64::()?; - // if i < 5 || i == rows - 1 { - // println!("Row {}: {}", i, val); - // } ColumnData::UInt64(val) }, ColumnType::String => ColumnData::String(read_string(reader)?), ColumnType::UInt8 | ColumnType::Enum8 => ColumnData::UInt8(reader.read_u8()?), ColumnType::Int => ColumnData::Int(reader.read_i32::()?), - ColumnType::Unsupported(_) => ColumnData::Null, + ColumnType::Unsupported(type_name) => { + ColumnData::String(format!("", type_name)) + } }; data.push(value); } @@ -126,11 +128,9 @@ fn read_var_u64(reader: &mut impl Read) -> io::Result { } fn skip_block_header(reader: &mut BufReader) -> io::Result<()> { - // Skip the marker let mut marker = [0u8; 4]; reader.read_exact(&mut marker)?; - // Skip two strings (each prefixed with length) for _ in 0..2 { let str_len = reader.read_u8()? as u64; reader.seek_relative(str_len as i64)?; @@ -140,14 +140,10 @@ fn skip_block_header(reader: &mut BufReader) -> io::Result<()> { } fn read_native_format(reader: &mut BufReader) -> io::Result> { - // Read number of columns let num_columns = read_var_u64(reader)?; let mut columns = Vec::new(); - - // Read block size let num_rows = read_var_u64(reader)?; - // Read column definitions and first block data for _ in 0..num_columns { let name = read_string(reader)?; let type_str = read_string(reader)?; @@ -156,13 +152,11 @@ fn read_native_format(reader: &mut BufReader) -> io::Result> { columns.push(Column { name, type_: column_type, type_params, data }); } - // Read subsequent blocks loop { - // Try to read number of columns for next block - let pos = reader.stream_position()?; + let _pos = reader.stream_position()?; let block_columns = match read_var_u64(reader) { Ok(cols) => cols, - Err(_) => break, // End of file + Err(_) => break, }; let block_rows = read_var_u64(reader)?; @@ -171,13 +165,11 @@ fn read_native_format(reader: &mut BufReader) -> io::Result> { break; } - // Skip column definitions for the block (they should match) for _ in 0..block_columns { - let _ = read_string(reader)?; // column name - let _ = read_string(reader)?; // column type + let _ = read_string(reader)?; + let _ = read_string(reader)?; } - // Read block data for col in &mut columns { let mut new_data = read_column_data(reader, &col.type_, block_rows)?; col.data.append(&mut new_data); @@ -260,11 +252,16 @@ impl VTab for ClickHouseVTab { let mut vector = output.flat_vector(col_idx); match &column.type_ { - ColumnType::String => { + ColumnType::String | ColumnType::Unsupported(_) => { for row in 0..batch_size { let data_idx = (*init_data).current_row + row; - if let ColumnData::String(s) = &column.data[data_idx] { - vector.insert(row, s.as_str()); + match &column.data[data_idx] { + ColumnData::String(s) => { + let cleaned = s.replace('\0', "") + .replace('\u{FFFD}', ""); + vector.insert(row, cleaned.as_str()) + }, + _ => vector.insert(row, ""), } } }, @@ -295,11 +292,6 @@ impl VTab for ClickHouseVTab { } } }, - ColumnType::Unsupported(_) => { - for row in 0..batch_size { - vector.insert(row, "NULL"); - } - }, } } (*init_data).current_row += batch_size;