Skip to content

Commit

Permalink
unsupported handler
Browse files Browse the repository at this point in the history
  • Loading branch information
lmangani committed Dec 22, 2024
1 parent 771c70c commit d32d38d
Showing 1 changed file with 20 additions and 28 deletions.
48 changes: 20 additions & 28 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,10 @@ fn read_string(reader: &mut impl Read) -> io::Result<String> {
let len = reader.read_u8()?;
let mut buffer = vec![0; len as usize];
reader.read_exact(&mut buffer)?;
Ok(String::from_utf8_lossy(&buffer).into_owned())
Ok(String::from_utf8_lossy(&buffer)
.replace('\0', "")
.replace('\u{FFFD}', "")
.to_string())
}

fn parse_column_type(type_str: &str) -> (ColumnType, Option<String>) {
Expand Down Expand Up @@ -90,19 +93,18 @@ fn parse_column_type(type_str: &str) -> (ColumnType, Option<String>) {

fn read_column_data(reader: &mut impl Read, column_type: &ColumnType, rows: u64) -> io::Result<Vec<ColumnData>> {
let mut data = Vec::with_capacity(rows as usize);
for i in 0..rows {
for _ in 0..rows {
let value = match column_type {
ColumnType::UInt64 => {
let val = reader.read_u64::<LittleEndian>()?;
// if i < 5 || i == rows - 1 {
// println!("Row {}: {}", i, val);
// }
ColumnData::UInt64(val)
},
ColumnType::String => ColumnData::String(read_string(reader)?),
ColumnType::UInt8 | ColumnType::Enum8 => ColumnData::UInt8(reader.read_u8()?),
ColumnType::Int => ColumnData::Int(reader.read_i32::<LittleEndian>()?),
ColumnType::Unsupported(_) => ColumnData::Null,
ColumnType::Unsupported(type_name) => {
ColumnData::String(format!("<unsupported:{}>", type_name))
}
};
data.push(value);
}
Expand All @@ -126,11 +128,9 @@ fn read_var_u64(reader: &mut impl Read) -> io::Result<u64> {
}

fn skip_block_header(reader: &mut BufReader<File>) -> io::Result<()> {
// Skip the marker
let mut marker = [0u8; 4];
reader.read_exact(&mut marker)?;

// Skip two strings (each prefixed with length)
for _ in 0..2 {
let str_len = reader.read_u8()? as u64;
reader.seek_relative(str_len as i64)?;
Expand All @@ -140,14 +140,10 @@ fn skip_block_header(reader: &mut BufReader<File>) -> io::Result<()> {
}

fn read_native_format(reader: &mut BufReader<File>) -> io::Result<Vec<Column>> {
// Read number of columns
let num_columns = read_var_u64(reader)?;
let mut columns = Vec::new();

// Read block size
let num_rows = read_var_u64(reader)?;

// Read column definitions and first block data
for _ in 0..num_columns {
let name = read_string(reader)?;
let type_str = read_string(reader)?;
Expand All @@ -156,13 +152,11 @@ fn read_native_format(reader: &mut BufReader<File>) -> io::Result<Vec<Column>> {
columns.push(Column { name, type_: column_type, type_params, data });
}

// Read subsequent blocks
loop {
// Try to read number of columns for next block
let pos = reader.stream_position()?;
let _pos = reader.stream_position()?;
let block_columns = match read_var_u64(reader) {
Ok(cols) => cols,
Err(_) => break, // End of file
Err(_) => break,
};

let block_rows = read_var_u64(reader)?;
Expand All @@ -171,13 +165,11 @@ fn read_native_format(reader: &mut BufReader<File>) -> io::Result<Vec<Column>> {
break;
}

// Skip column definitions for the block (they should match)
for _ in 0..block_columns {
let _ = read_string(reader)?; // column name
let _ = read_string(reader)?; // column type
let _ = read_string(reader)?;
let _ = read_string(reader)?;
}

// Read block data
for col in &mut columns {
let mut new_data = read_column_data(reader, &col.type_, block_rows)?;
col.data.append(&mut new_data);
Expand Down Expand Up @@ -260,11 +252,16 @@ impl VTab for ClickHouseVTab {
let mut vector = output.flat_vector(col_idx);

match &column.type_ {
ColumnType::String => {
ColumnType::String | ColumnType::Unsupported(_) => {
for row in 0..batch_size {
let data_idx = (*init_data).current_row + row;
if let ColumnData::String(s) = &column.data[data_idx] {
vector.insert(row, s.as_str());
match &column.data[data_idx] {
ColumnData::String(s) => {
let cleaned = s.replace('\0', "")
.replace('\u{FFFD}', "");
vector.insert(row, cleaned.as_str())
},
_ => vector.insert(row, "<invalid>"),
}
}
},
Expand Down Expand Up @@ -295,11 +292,6 @@ impl VTab for ClickHouseVTab {
}
}
},
ColumnType::Unsupported(_) => {
for row in 0..batch_size {
vector.insert(row, "NULL");
}
},
}
}
(*init_data).current_row += batch_size;
Expand Down

0 comments on commit d32d38d

Please sign in to comment.