Skip to content

Commit

Permalink
refactor!: consolidate on (batches, schema)
Browse files Browse the repository at this point in the history
  • Loading branch information
gadomski committed Aug 13, 2024
1 parent e8a0374 commit 24a4a8c
Show file tree
Hide file tree
Showing 9 changed files with 20 additions and 20 deletions.
2 changes: 1 addition & 1 deletion src/algorithm/native/explode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ impl ExplodeTable for Table {
schema_builder.push(field.clone());
let schema = schema_builder.finish();

Table::try_new(schema.into(), new_batches)
Table::try_new(new_batches, schema.into())
} else {
// No take is necessary; nothing happens
Ok(self.clone())
Expand Down
2 changes: 1 addition & 1 deletion src/io/gdal/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub fn read_gdal(layer: &mut Layer, batch_size: Option<usize>) -> Result<Table>
.into_iter()
.collect::<std::result::Result<Vec<RecordBatch>, ArrowError>>()?;

Table::try_new(schema, batches)
Table::try_new(batches, schema)
}

#[cfg(test)]
Expand Down
2 changes: 1 addition & 1 deletion src/io/geozero/table/builder/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ impl<G: GeometryArrayBuilder + GeomProcessor> GeoTableBuilder<G> {

let batches = self.batches;
let schema = batches[0].schema();
let mut table = Table::try_new(schema, batches)?;
let mut table = Table::try_new(batches, schema)?;

let geom_slices = self
.geom_arrays
Expand Down
4 changes: 2 additions & 2 deletions src/io/ipc/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ pub fn read_ipc<R: Read + Seek>(reader: R) -> Result<Table> {
let reader = FileReader::try_new(reader, None)?;
let schema = reader.schema();
let batches = reader.collect::<std::result::Result<Vec<_>, ArrowError>>()?;
Table::try_new(schema, batches)
Table::try_new(batches, schema)
}

/// Read into a Table from Arrow IPC record batch stream.
pub fn read_ipc_stream<R: Read>(reader: R) -> Result<Table> {
let reader = StreamReader::try_new(reader, None)?;
let schema = reader.schema();
let batches = reader.collect::<std::result::Result<Vec<_>, ArrowError>>()?;
Table::try_new(schema, batches)
Table::try_new(batches, schema)
}
2 changes: 1 addition & 1 deletion src/io/parquet/reader/async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ impl<T: AsyncFileReader + Unpin + Send + 'static> GeoParquetRecordBatchStream<T>
pub async fn read_table(self) -> Result<Table> {
let output_schema = self.output_schema.clone();
let batches = self.read_stream().try_collect::<_>().await?;
Table::try_new(output_schema, batches)
Table::try_new(batches, output_schema)
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/io/parquet/reader/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ impl GeoParquetRecordBatchReader {
pub fn read_table(self) -> Result<Table> {
let output_schema = self.output_schema.clone();
let batches = self.collect::<std::result::Result<Vec<_>, ArrowError>>()?;
Table::try_new(output_schema, batches)
Table::try_new(batches, output_schema)
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/io/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ impl RecordBatchReader {

impl From<Table> for RecordBatchReader {
fn from(value: Table) -> Self {
let (schema, batches) = value.into_inner();
let (batches, schema) = value.into_inner();
Self(Some(Box::new(RecordBatchIterator::new(
batches.into_iter().map(Ok),
schema,
Expand Down
22 changes: 11 additions & 11 deletions src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ impl Table {
/// let schema: SchemaRef = Schema::new(vec![field]).into();
/// let columns = vec![array.into_array_ref()];
/// let batch = RecordBatch::try_new(schema.clone(), columns).unwrap();
/// let table = Table::try_new(schema, vec![batch]).unwrap();
/// let table = Table::try_new(vec![batch], schema).unwrap();
/// ```
pub fn try_new(schema: SchemaRef, batches: Vec<RecordBatch>) -> Result<Self> {
pub fn try_new(batches: Vec<RecordBatch>, schema: SchemaRef) -> Result<Self> {
for batch in batches.iter() {
// Don't check schema metadata in comparisons.
// TODO: I have some issues in the Parquet reader where the batches are missing the
Expand Down Expand Up @@ -116,11 +116,11 @@ impl Table {
/// vec![Arc::new(id_array)]
/// ).unwrap();
///
/// let table = Table::from_arrow_and_geometry(schema_ref, vec![batch], Arc::new(chunked_array)).unwrap();
/// let table = Table::from_arrow_and_geometry(vec![batch], schema_ref, Arc::new(chunked_array)).unwrap();
/// ```
pub fn from_arrow_and_geometry(
schema: SchemaRef,
batches: Vec<RecordBatch>,
schema: SchemaRef,
geometry: Arc<dyn ChunkedGeometryArrayTrait>,
) -> Result<Self> {
if batches.is_empty() {
Expand All @@ -138,7 +138,7 @@ impl Table {
new_batches.push(RecordBatch::try_new(new_schema.clone(), columns)?);
}

Self::try_new(new_schema, new_batches)
Self::try_new(new_batches, new_schema)
}

/// Casts the geometry at `index` to a different data type
Expand Down Expand Up @@ -269,7 +269,7 @@ impl Table {
target_geo_data_type: Option<GeoDataType>,
) -> Result<Self> {
if batches.is_empty() {
return Self::try_new(schema, batches);
return Self::try_new(batches, schema);
}

let num_batches = batches.len();
Expand Down Expand Up @@ -369,7 +369,7 @@ impl Table {
new_record_batches.push(RecordBatch::try_new(new_schema.clone(), new_batch).unwrap());
}

Table::try_new(new_schema, new_record_batches)
Table::try_new(new_record_batches, new_schema)
}

/// Returns the length of this table.
Expand Down Expand Up @@ -419,11 +419,11 @@ impl Table {
///
/// let file = File::open("fixtures/roads.geojson").unwrap();
/// let table = geoarrow::io::geojson::read_geojson(file, Default::default()).unwrap();
/// let (schema, record_batches) = table.into_inner();
/// let (batches, schema) = table.into_inner();
/// # }
/// ```
pub fn into_inner(self) -> (SchemaRef, Vec<RecordBatch>) {
(self.schema, self.batches)
pub fn into_inner(self) -> (Vec<RecordBatch>, SchemaRef) {
(self.batches, self.schema)
}

/// Returns a reference to this table's schema.
Expand Down Expand Up @@ -706,6 +706,6 @@ impl TryFrom<Box<dyn arrow_array::RecordBatchReader>> for Table {
let batches = value
.into_iter()
.collect::<std::result::Result<Vec<_>, ArrowError>>()?;
Table::try_new(schema, batches)
Table::try_new(batches, schema)
}
}
2 changes: 1 addition & 1 deletion src/test/point.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,5 +53,5 @@ pub(crate) fn table() -> Table {
)
.unwrap();

Table::try_new(schema, vec![batch]).unwrap()
Table::try_new(vec![batch], schema).unwrap()
}

0 comments on commit 24a4a8c

Please sign in to comment.