Skip to content

Commit

Permalink
refactor!: consolidate on (batches, schema)
Browse files Browse the repository at this point in the history
  • Loading branch information
gadomski committed Aug 13, 2024
1 parent e8a0374 commit 39805e9
Show file tree
Hide file tree
Showing 15 changed files with 38 additions and 38 deletions.
4 changes: 2 additions & 2 deletions js/src/io/flatgeobuf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,6 @@ pub fn read_flatgeobuf(file: &[u8], batch_size: Option<usize>) -> WasmResult<Tab
..Default::default()
};
let geo_table = _read_flatgeobuf(&mut cursor, options)?;
let (schema, batches) = geo_table.into_inner();
Ok(Table::new(schema, batches))
let (batches, schema) = geo_table.into_inner();
Ok(Table::new(batches, schema))
}
8 changes: 4 additions & 4 deletions js/src/io/geojson.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,17 @@ pub fn read_geojson(file: &[u8], batch_size: Option<usize>) -> WasmResult<Table>
// assert_parquet_file_not_empty(parquet_file)?;
let mut cursor = Cursor::new(file);
let geo_table = _read_geojson(&mut cursor, batch_size)?;
let (schema, batches) = geo_table.into_inner();
Ok(Table::new(schema, batches))
let (batches, schema) = geo_table.into_inner();
Ok(Table::new(batches, schema))
}

/// Write table to GeoJSON
///
/// Note that this consumes the table input
#[wasm_bindgen(js_name = writeGeoJSON)]
pub fn write_geojson(table: Table) -> WasmResult<Vec<u8>> {
let (schema, batches) = table.into_inner();
let rust_table = geoarrow::table::Table::try_new(schema, batches)?;
let (batches, schema) = table.into_inner();
let rust_table = geoarrow::table::Table::try_new(batches, schema)?;
let mut output_file: Vec<u8> = vec![];
_write_geojson(rust_table, &mut output_file)?;
Ok(output_file)
Expand Down
8 changes: 4 additions & 4 deletions js/src/io/parquet/async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,8 @@ impl ParquetFile {
)
.build()?;
let table = stream.read_table().await?;
let (schema, batches) = table.into_inner();
Ok(Table::new(schema, batches))
let (batches, schema) = table.into_inner();
Ok(Table::new(batches, schema))
}
#[wasm_bindgen]
pub async fn read_stream(
Expand Down Expand Up @@ -271,8 +271,8 @@ impl ParquetDataset {
all_batches.extend(table_batches);
});
let table = geoarrow::table::Table::try_new(output_schema, all_batches)?;
let (schema, batches) = table.into_inner();
Ok(Table::new(schema, batches))
let (batches, schema) = table.into_inner();
Ok(Table::new(batches, schema))
}

// TODO: reimplement this. Now that we have a vec of readers under the hood, we need to combine
Expand Down
8 changes: 4 additions & 4 deletions js/src/io/parquet/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,17 @@ pub fn read_geoparquet(file: Vec<u8>) -> WasmResult<Table> {
)?
.build()?;
let geo_table = reader.read_table()?;
let (schema, batches) = geo_table.into_inner();
Ok(Table::new(schema, batches))
let (batches, schema) = geo_table.into_inner();
Ok(Table::new(batches, schema))
}

/// Write table to GeoParquet
///
/// Note that this consumes the table input
#[wasm_bindgen(js_name = writeGeoParquet)]
pub fn write_geoparquet(table: Table) -> WasmResult<Vec<u8>> {
let (schema, batches) = table.into_inner();
let mut rust_table = geoarrow::table::Table::try_new(schema, batches)?;
let (batches, schema) = table.into_inner();
let mut rust_table = geoarrow::table::Table::try_new(batches, schema)?;
let mut output_file: Vec<u8> = vec![];
_write_geoparquet(&mut rust_table, &mut output_file, &Default::default())?;
Ok(output_file)
Expand Down
2 changes: 1 addition & 1 deletion python/core/src/ffi/from_python/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ impl<'a> FromPyObject<'a> for GeoTable {
batches.push(batch);
}

let table = geoarrow::table::Table::try_new(schema, batches)
let table = geoarrow::table::Table::try_new(batches, schema)
.map_err(|e| PyValueError::new_err(e.to_string()))?;
let table = table
.downcast(true)
Expand Down
4 changes: 2 additions & 2 deletions python/core/src/interop/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,11 @@ pub(crate) fn import_pyogrio(py: Python) -> PyGeoArrowResult<Bound<PyModule>> {
}

pub(crate) fn table_to_pytable(table: geoarrow::table::Table) -> PyTable {
let (schema, batches) = table.into_inner();
let (batches, schema) = table.into_inner();
PyTable::new(batches, schema)
}

pub(crate) fn pytable_to_table(table: PyTable) -> Result<geoarrow::table::Table, GeoArrowError> {
let (batches, schema) = table.into_inner();
geoarrow::table::Table::try_new(schema, batches)
geoarrow::table::Table::try_new(batches, schema)
}
2 changes: 1 addition & 1 deletion src/algorithm/native/explode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ impl ExplodeTable for Table {
schema_builder.push(field.clone());
let schema = schema_builder.finish();

Table::try_new(schema.into(), new_batches)
Table::try_new(new_batches, schema.into())
} else {
// No take is necessary; nothing happens
Ok(self.clone())
Expand Down
2 changes: 1 addition & 1 deletion src/io/gdal/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub fn read_gdal(layer: &mut Layer, batch_size: Option<usize>) -> Result<Table>
.into_iter()
.collect::<std::result::Result<Vec<RecordBatch>, ArrowError>>()?;

Table::try_new(schema, batches)
Table::try_new(batches, schema)
}

#[cfg(test)]
Expand Down
2 changes: 1 addition & 1 deletion src/io/geozero/table/builder/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ impl<G: GeometryArrayBuilder + GeomProcessor> GeoTableBuilder<G> {

let batches = self.batches;
let schema = batches[0].schema();
let mut table = Table::try_new(schema, batches)?;
let mut table = Table::try_new(batches, schema)?;

let geom_slices = self
.geom_arrays
Expand Down
4 changes: 2 additions & 2 deletions src/io/ipc/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ pub fn read_ipc<R: Read + Seek>(reader: R) -> Result<Table> {
let reader = FileReader::try_new(reader, None)?;
let schema = reader.schema();
let batches = reader.collect::<std::result::Result<Vec<_>, ArrowError>>()?;
Table::try_new(schema, batches)
Table::try_new(batches, schema)
}

/// Read into a Table from Arrow IPC record batch stream.
pub fn read_ipc_stream<R: Read>(reader: R) -> Result<Table> {
let reader = StreamReader::try_new(reader, None)?;
let schema = reader.schema();
let batches = reader.collect::<std::result::Result<Vec<_>, ArrowError>>()?;
Table::try_new(schema, batches)
Table::try_new(batches, schema)
}
2 changes: 1 addition & 1 deletion src/io/parquet/reader/async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ impl<T: AsyncFileReader + Unpin + Send + 'static> GeoParquetRecordBatchStream<T>
pub async fn read_table(self) -> Result<Table> {
let output_schema = self.output_schema.clone();
let batches = self.read_stream().try_collect::<_>().await?;
Table::try_new(output_schema, batches)
Table::try_new(batches, output_schema)
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/io/parquet/reader/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ impl GeoParquetRecordBatchReader {
pub fn read_table(self) -> Result<Table> {
let output_schema = self.output_schema.clone();
let batches = self.collect::<std::result::Result<Vec<_>, ArrowError>>()?;
Table::try_new(output_schema, batches)
Table::try_new(batches, output_schema)
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/io/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ impl RecordBatchReader {

impl From<Table> for RecordBatchReader {
fn from(value: Table) -> Self {
let (schema, batches) = value.into_inner();
let (batches, schema) = value.into_inner();
Self(Some(Box::new(RecordBatchIterator::new(
batches.into_iter().map(Ok),
schema,
Expand Down
24 changes: 12 additions & 12 deletions src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ impl Table {
/// let schema: SchemaRef = Schema::new(vec![field]).into();
/// let columns = vec![array.into_array_ref()];
/// let batch = RecordBatch::try_new(schema.clone(), columns).unwrap();
/// let table = Table::try_new(schema, vec![batch]).unwrap();
/// let table = Table::try_new(vec![batch], schema).unwrap();
/// ```
pub fn try_new(schema: SchemaRef, batches: Vec<RecordBatch>) -> Result<Self> {
pub fn try_new(batches: Vec<RecordBatch>, schema: SchemaRef) -> Result<Self> {
for batch in batches.iter() {
// Don't check schema metadata in comparisons.
// TODO: I have some issues in the Parquet reader where the batches are missing the
Expand All @@ -80,7 +80,7 @@ impl Table {
}
}

Ok(Self { schema, batches })
Ok(Self { batches, schema })
}

/// Creates a new table from a schema, a vector of record batches, and a chunked geometry array.
Expand Down Expand Up @@ -116,11 +116,11 @@ impl Table {
/// vec![Arc::new(id_array)]
/// ).unwrap();
///
/// let table = Table::from_arrow_and_geometry(schema_ref, vec![batch], Arc::new(chunked_array)).unwrap();
/// let table = Table::from_arrow_and_geometry(vec![batch], schema_ref, Arc::new(chunked_array)).unwrap();
/// ```
pub fn from_arrow_and_geometry(
schema: SchemaRef,
batches: Vec<RecordBatch>,
schema: SchemaRef,
geometry: Arc<dyn ChunkedGeometryArrayTrait>,
) -> Result<Self> {
if batches.is_empty() {
Expand All @@ -138,7 +138,7 @@ impl Table {
new_batches.push(RecordBatch::try_new(new_schema.clone(), columns)?);
}

Self::try_new(new_schema, new_batches)
Self::try_new(new_batches, new_schema)
}

/// Casts the geometry at `index` to a different data type
Expand Down Expand Up @@ -269,7 +269,7 @@ impl Table {
target_geo_data_type: Option<GeoDataType>,
) -> Result<Self> {
if batches.is_empty() {
return Self::try_new(schema, batches);
return Self::try_new(batches, schema);
}

let num_batches = batches.len();
Expand Down Expand Up @@ -369,7 +369,7 @@ impl Table {
new_record_batches.push(RecordBatch::try_new(new_schema.clone(), new_batch).unwrap());
}

Table::try_new(new_schema, new_record_batches)
Table::try_new(new_record_batches, new_schema)
}

/// Returns the length of this table.
Expand Down Expand Up @@ -419,11 +419,11 @@ impl Table {
///
/// let file = File::open("fixtures/roads.geojson").unwrap();
/// let table = geoarrow::io::geojson::read_geojson(file, Default::default()).unwrap();
/// let (schema, record_batches) = table.into_inner();
/// let (batches, schema) = table.into_inner();
/// # }
/// ```
pub fn into_inner(self) -> (SchemaRef, Vec<RecordBatch>) {
(self.schema, self.batches)
pub fn into_inner(self) -> (Vec<RecordBatch>, SchemaRef) {
(self.batches, self.schema)
}

/// Returns a reference to this table's schema.
Expand Down Expand Up @@ -706,6 +706,6 @@ impl TryFrom<Box<dyn arrow_array::RecordBatchReader>> for Table {
let batches = value
.into_iter()
.collect::<std::result::Result<Vec<_>, ArrowError>>()?;
Table::try_new(schema, batches)
Table::try_new(batches, schema)
}
}
2 changes: 1 addition & 1 deletion src/test/point.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,5 +53,5 @@ pub(crate) fn table() -> Table {
)
.unwrap();

Table::try_new(schema, vec![batch]).unwrap()
Table::try_new(vec![batch], schema).unwrap()
}

0 comments on commit 39805e9

Please sign in to comment.