Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rust/drivers/datafusion): add support for bulk ingest #2279

Merged
merged 5 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
245 changes: 190 additions & 55 deletions rust/drivers/datafusion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,16 @@

#![allow(refining_impl_trait)]

use adbc_core::ffi::constants;
use datafusion::dataframe::DataFrameWriteOptions;
use datafusion::datasource::TableType;
use datafusion::prelude::*;
use datafusion_substrait::logical_plan::consumer::from_substrait_plan;
use datafusion_substrait::substrait::proto::Plan;
use prost::Message;
use std::fmt::Debug;
use std::sync::Arc;
use std::vec::IntoIter;
use std::{collections::HashMap, fmt::Debug};
use tokio::runtime::Runtime;

use arrow_array::builder::{
Expand Down Expand Up @@ -113,9 +115,7 @@ impl Driver for DataFusionDriver {
type DatabaseType = DataFusionDatabase;

fn new_database(&mut self) -> Result<Self::DatabaseType> {
Ok(Self::DatabaseType {
options: HashMap::new(),
})
Ok(Self::DatabaseType {})
}

fn new_database_with_opts(
Expand All @@ -127,46 +127,56 @@ impl Driver for DataFusionDriver {
),
>,
) -> adbc_core::error::Result<Self::DatabaseType> {
let mut database = Self::DatabaseType {
options: HashMap::new(),
};
let mut database = Self::DatabaseType {};
for (key, value) in opts {
database.set_option(key, value)?;
}
Ok(database)
}
}

pub struct DataFusionDatabase {
options: HashMap<OptionDatabase, OptionValue>,
}
pub struct DataFusionDatabase {}

impl Optionable for DataFusionDatabase {
type Option = OptionDatabase;

fn set_option(
&mut self,
_key: Self::Option,
key: Self::Option,
_value: adbc_core::options::OptionValue,
) -> adbc_core::error::Result<()> {
self.options.insert(_key, _value);
Ok(())
Err(Error::with_message_and_status(
format!("Unrecognized option: {key:?}"),
Status::NotFound,
))
}

fn get_option_string(&self, _key: Self::Option) -> adbc_core::error::Result<String> {
todo!()
fn get_option_string(&self, key: Self::Option) -> adbc_core::error::Result<String> {
Err(Error::with_message_and_status(
format!("Unrecognized option: {key:?}"),
Status::NotFound,
))
}

fn get_option_bytes(&self, _key: Self::Option) -> adbc_core::error::Result<Vec<u8>> {
todo!()
fn get_option_bytes(&self, key: Self::Option) -> adbc_core::error::Result<Vec<u8>> {
Err(Error::with_message_and_status(
format!("Unrecognized option: {key:?}"),
Status::NotFound,
))
}

fn get_option_int(&self, _key: Self::Option) -> adbc_core::error::Result<i64> {
todo!()
fn get_option_int(&self, key: Self::Option) -> adbc_core::error::Result<i64> {
Err(Error::with_message_and_status(
format!("Unrecognized option: {key:?}"),
Status::NotFound,
))
}

fn get_option_double(&self, _key: Self::Option) -> adbc_core::error::Result<f64> {
todo!()
fn get_option_double(&self, key: Self::Option) -> adbc_core::error::Result<f64> {
Err(Error::with_message_and_status(
format!("Unrecognized option: {key:?}"),
Status::NotFound,
))
}
}

Expand All @@ -189,7 +199,7 @@ impl Database for DataFusionDatabase {

fn new_connection_with_opts(
&mut self,
_opts: impl IntoIterator<
opts: impl IntoIterator<
Item = (
adbc_core::options::OptionConnection,
adbc_core::options::OptionValue,
Expand All @@ -203,10 +213,16 @@ impl Database for DataFusionDatabase {
.build()
.unwrap();

Ok(DataFusionConnection {
let mut connection = DataFusionConnection {
runtime: Arc::new(runtime),
ctx: Arc::new(ctx),
})
};

for (key, value) in opts {
connection.set_option(key, value)?;
}

Ok(connection)
}
}

Expand All @@ -220,26 +236,85 @@ impl Optionable for DataFusionConnection {

fn set_option(
&mut self,
_key: Self::Option,
_value: adbc_core::options::OptionValue,
key: Self::Option,
value: adbc_core::options::OptionValue,
) -> adbc_core::error::Result<()> {
todo!()
match key.as_ref() {
constants::ADBC_CONNECTION_OPTION_CURRENT_CATALOG => match value {
OptionValue::String(value) => {
self.runtime.block_on(async {
let query = format!("SET datafusion.catalog.default_catalog = {value}");
self.ctx.sql(query.as_str()).await.unwrap();
});
Ok(())
}
_ => Err(Error::with_message_and_status(
"CurrentCatalog value must be of type String",
Status::InvalidArguments,
)),
},
constants::ADBC_CONNECTION_OPTION_CURRENT_DB_SCHEMA => match value {
OptionValue::String(value) => {
self.runtime.block_on(async {
let query = format!("SET datafusion.catalog.default_schema = {value}");
self.ctx.sql(query.as_str()).await.unwrap();
});
Ok(())
}
_ => Err(Error::with_message_and_status(
"CurrentSchema value must be of type String",
Status::InvalidArguments,
)),
},
_ => Err(Error::with_message_and_status(
format!("Unrecognized option: {key:?}"),
Status::NotFound,
)),
}
}

fn get_option_string(&self, _key: Self::Option) -> adbc_core::error::Result<String> {
todo!()
fn get_option_string(&self, key: Self::Option) -> adbc_core::error::Result<String> {
match key.as_ref() {
constants::ADBC_CONNECTION_OPTION_CURRENT_CATALOG => Ok(self
.ctx
.state()
.config_options()
.catalog
.default_catalog
.clone()),
constants::ADBC_CONNECTION_OPTION_CURRENT_DB_SCHEMA => Ok(self
.ctx
.state()
.config_options()
.catalog
.default_schema
.clone()),
_ => Err(Error::with_message_and_status(
format!("Unrecognized option: {key:?}"),
Status::NotFound,
)),
}
}

fn get_option_bytes(&self, _key: Self::Option) -> adbc_core::error::Result<Vec<u8>> {
todo!()
fn get_option_bytes(&self, key: Self::Option) -> adbc_core::error::Result<Vec<u8>> {
Err(Error::with_message_and_status(
format!("Unrecognized option: {key:?}"),
Status::NotFound,
))
}

fn get_option_int(&self, _key: Self::Option) -> adbc_core::error::Result<i64> {
todo!()
fn get_option_int(&self, key: Self::Option) -> adbc_core::error::Result<i64> {
Err(Error::with_message_and_status(
format!("Unrecognized option: {key:?}"),
Status::NotFound,
))
}

fn get_option_double(&self, _key: Self::Option) -> adbc_core::error::Result<f64> {
todo!()
fn get_option_double(&self, key: Self::Option) -> adbc_core::error::Result<f64> {
Err(Error::with_message_and_status(
format!("Unrecognized option: {key:?}"),
Status::NotFound,
))
}
}

Expand Down Expand Up @@ -622,6 +697,8 @@ impl Connection for DataFusionConnection {
ctx: self.ctx.clone(),
sql_query: None,
substrait_plan: None,
bound_record_batch: None,
ingest_target_table: None,
})
}

Expand Down Expand Up @@ -707,39 +784,81 @@ pub struct DataFusionStatement {
ctx: Arc<SessionContext>,
sql_query: Option<String>,
substrait_plan: Option<Plan>,
bound_record_batch: Option<RecordBatch>,
ingest_target_table: Option<String>,
}

impl Optionable for DataFusionStatement {
type Option = OptionStatement;

fn set_option(
&mut self,
_key: Self::Option,
_value: adbc_core::options::OptionValue,
key: Self::Option,
value: adbc_core::options::OptionValue,
) -> adbc_core::error::Result<()> {
todo!()
match key.as_ref() {
constants::ADBC_INGEST_OPTION_TARGET_TABLE => match value {
OptionValue::String(value) => {
self.ingest_target_table = Some(value);
Ok(())
}
_ => Err(Error::with_message_and_status(
"IngestOptionTargetTable value must be of type String",
Status::InvalidArguments,
)),
},
_ => Err(Error::with_message_and_status(
format!("Unrecognized option: {key:?}"),
Status::NotFound,
)),
}
}

fn get_option_string(&self, _key: Self::Option) -> adbc_core::error::Result<String> {
todo!()
fn get_option_string(&self, key: Self::Option) -> adbc_core::error::Result<String> {
match key.as_ref() {
constants::ADBC_INGEST_OPTION_TARGET_TABLE => {
let target_table = self.ingest_target_table.clone();
match target_table {
Some(table) => Ok(table),
None => Err(Error::with_message_and_status(
format!("{key:?} has not been set"),
Status::NotFound,
)),
}
}
_ => Err(Error::with_message_and_status(
format!("Unrecognized option: {key:?}"),
Status::NotFound,
)),
}
}

fn get_option_bytes(&self, _key: Self::Option) -> adbc_core::error::Result<Vec<u8>> {
todo!()
fn get_option_bytes(&self, key: Self::Option) -> adbc_core::error::Result<Vec<u8>> {
Err(Error::with_message_and_status(
format!("Unrecognized option: {key:?}"),
Status::NotFound,
))
}

fn get_option_int(&self, _key: Self::Option) -> adbc_core::error::Result<i64> {
todo!()
fn get_option_int(&self, key: Self::Option) -> adbc_core::error::Result<i64> {
Err(Error::with_message_and_status(
format!("Unrecognized option: {key:?}"),
Status::NotFound,
))
}

fn get_option_double(&self, _key: Self::Option) -> adbc_core::error::Result<f64> {
todo!()
fn get_option_double(&self, key: Self::Option) -> adbc_core::error::Result<f64> {
Err(Error::with_message_and_status(
format!("Unrecognized option: {key:?}"),
Status::NotFound,
))
}
}

impl Statement for DataFusionStatement {
fn bind(&mut self, _batch: arrow_array::RecordBatch) -> adbc_core::error::Result<()> {
todo!()
fn bind(&mut self, batch: arrow_array::RecordBatch) -> adbc_core::error::Result<()> {
self.bound_record_batch.replace(batch);
Ok(())
}

fn bind_stream(
Expand Down Expand Up @@ -768,13 +887,29 @@ impl Statement for DataFusionStatement {
}

fn execute_update(&mut self) -> adbc_core::error::Result<Option<i64>> {
self.runtime.block_on(async {
let _ = self
.ctx
.sql(&self.sql_query.clone().unwrap())
.await
.unwrap();
});
if self.sql_query.is_some() {
self.runtime.block_on(async {
let _ = self
.ctx
.sql(&self.sql_query.clone().unwrap())
.await
.unwrap();
});
} else if let Some(batch) = self.bound_record_batch.take() {
self.runtime.block_on(async {
let table = match self.ingest_target_table.clone() {
Some(table) => table,
None => todo!(),
};

self.ctx
.read_batch(batch)
.unwrap()
.write_table(table.as_str(), DataFrameWriteOptions::new())
.await
.unwrap();
});
}

Ok(Some(0))
}
Expand Down
Loading
Loading