Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rust/drivers/datafusion): add support for bulk ingest #2279

Merged
merged 5 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
248 changes: 195 additions & 53 deletions rust/drivers/datafusion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,17 @@

#![allow(refining_impl_trait)]

use adbc_core::ffi::constants;
use datafusion::dataframe::DataFrameWriteOptions;
use datafusion::datasource::TableType;
use datafusion::prelude::*;
use datafusion_substrait::logical_plan::consumer::from_substrait_plan;
use datafusion_substrait::substrait::proto::Plan;
use prost::Message;
use std::cell::RefCell;
use std::fmt::Debug;
use std::sync::Arc;
use std::vec::IntoIter;
use std::{collections::HashMap, fmt::Debug};
use tokio::runtime::Runtime;

use arrow_array::builder::{
Expand Down Expand Up @@ -113,9 +116,7 @@ impl Driver for DataFusionDriver {
type DatabaseType = DataFusionDatabase;

fn new_database(&mut self) -> Result<Self::DatabaseType> {
Ok(Self::DatabaseType {
options: HashMap::new(),
})
Ok(Self::DatabaseType {})
}

fn new_database_with_opts(
Expand All @@ -127,46 +128,56 @@ impl Driver for DataFusionDriver {
),
>,
) -> adbc_core::error::Result<Self::DatabaseType> {
let mut database = Self::DatabaseType {
options: HashMap::new(),
};
let mut database = Self::DatabaseType {};
for (key, value) in opts {
database.set_option(key, value)?;
}
Ok(database)
}
}

pub struct DataFusionDatabase {
options: HashMap<OptionDatabase, OptionValue>,
}
pub struct DataFusionDatabase {}

impl Optionable for DataFusionDatabase {
type Option = OptionDatabase;

fn set_option(
&mut self,
_key: Self::Option,
key: Self::Option,
_value: adbc_core::options::OptionValue,
) -> adbc_core::error::Result<()> {
self.options.insert(_key, _value);
Ok(())
Err(Error::with_message_and_status(
format!("Unrecognized option: {key:?}"),
Status::NotFound,
))
}

fn get_option_string(&self, _key: Self::Option) -> adbc_core::error::Result<String> {
todo!()
fn get_option_string(&self, key: Self::Option) -> adbc_core::error::Result<String> {
Err(Error::with_message_and_status(
format!("Unrecognized option: {key:?}"),
Status::NotFound,
))
}

fn get_option_bytes(&self, _key: Self::Option) -> adbc_core::error::Result<Vec<u8>> {
todo!()
fn get_option_bytes(&self, key: Self::Option) -> adbc_core::error::Result<Vec<u8>> {
Err(Error::with_message_and_status(
format!("Unrecognized option: {key:?}"),
Status::NotFound,
))
}

fn get_option_int(&self, _key: Self::Option) -> adbc_core::error::Result<i64> {
todo!()
fn get_option_int(&self, key: Self::Option) -> adbc_core::error::Result<i64> {
Err(Error::with_message_and_status(
format!("Unrecognized option: {key:?}"),
Status::NotFound,
))
}

fn get_option_double(&self, _key: Self::Option) -> adbc_core::error::Result<f64> {
todo!()
fn get_option_double(&self, key: Self::Option) -> adbc_core::error::Result<f64> {
Err(Error::with_message_and_status(
format!("Unrecognized option: {key:?}"),
Status::NotFound,
))
}
}

Expand All @@ -189,7 +200,7 @@ impl Database for DataFusionDatabase {

fn new_connection_with_opts(
&mut self,
_opts: impl IntoIterator<
opts: impl IntoIterator<
Item = (
adbc_core::options::OptionConnection,
adbc_core::options::OptionValue,
Expand All @@ -203,10 +214,16 @@ impl Database for DataFusionDatabase {
.build()
.unwrap();

Ok(DataFusionConnection {
let mut connection = DataFusionConnection {
runtime: Arc::new(runtime),
ctx: Arc::new(ctx),
})
};

for (key, value) in opts {
connection.set_option(key, value)?;
}

Ok(connection)
}
}

Expand All @@ -223,23 +240,86 @@ impl Optionable for DataFusionConnection {
_key: Self::Option,
_value: adbc_core::options::OptionValue,
) -> adbc_core::error::Result<()> {
todo!()
match _key.as_ref() {
constants::ADBC_CONNECTION_OPTION_CURRENT_CATALOG => match _value {
tokoko marked this conversation as resolved.
Show resolved Hide resolved
OptionValue::String(value) => {
self.runtime.block_on(async {
let query = format!(
"SET datafusion.catalog.default_catalog = {}",
value.as_str()
tokoko marked this conversation as resolved.
Show resolved Hide resolved
);
self.ctx.sql(query.as_str()).await.unwrap();
});
Ok(())
}
_ => Err(Error::with_message_and_status(
"CurrentCatalog value must be of type String",
Status::InvalidArguments,
)),
},
constants::ADBC_CONNECTION_OPTION_CURRENT_DB_SCHEMA => match _value {
OptionValue::String(value) => {
self.runtime.block_on(async {
let query =
format!("SET datafusion.catalog.default_schema = {}", value.as_str());
tokoko marked this conversation as resolved.
Show resolved Hide resolved
self.ctx.sql(query.as_str()).await.unwrap();
});
Ok(())
}
_ => Err(Error::with_message_and_status(
"CurrentSchema value must be of type String",
Status::InvalidArguments,
)),
},
_ => Err(Error::with_message_and_status(
format!("Unrecognized option: {_key:?}"),
Status::NotFound,
)),
}
}

fn get_option_string(&self, _key: Self::Option) -> adbc_core::error::Result<String> {
todo!()
fn get_option_string(&self, key: Self::Option) -> adbc_core::error::Result<String> {
match key.as_ref() {
constants::ADBC_CONNECTION_OPTION_CURRENT_CATALOG => Ok(self
.ctx
.state()
.config_options()
.catalog
.default_catalog
.clone()),
constants::ADBC_CONNECTION_OPTION_CURRENT_DB_SCHEMA => Ok(self
.ctx
.state()
.config_options()
.catalog
.default_schema
.clone()),
_ => Err(Error::with_message_and_status(
format!("Unrecognized option: {key:?}"),
Status::NotFound,
)),
}
}

fn get_option_bytes(&self, _key: Self::Option) -> adbc_core::error::Result<Vec<u8>> {
todo!()
fn get_option_bytes(&self, key: Self::Option) -> adbc_core::error::Result<Vec<u8>> {
Err(Error::with_message_and_status(
format!("Unrecognized option: {key:?}"),
Status::NotFound,
))
}

fn get_option_int(&self, _key: Self::Option) -> adbc_core::error::Result<i64> {
todo!()
fn get_option_int(&self, key: Self::Option) -> adbc_core::error::Result<i64> {
Err(Error::with_message_and_status(
format!("Unrecognized option: {key:?}"),
Status::NotFound,
))
}

fn get_option_double(&self, _key: Self::Option) -> adbc_core::error::Result<f64> {
todo!()
fn get_option_double(&self, key: Self::Option) -> adbc_core::error::Result<f64> {
Err(Error::with_message_and_status(
format!("Unrecognized option: {key:?}"),
Status::NotFound,
))
}
}

Expand Down Expand Up @@ -622,6 +702,8 @@ impl Connection for DataFusionConnection {
ctx: self.ctx.clone(),
sql_query: None,
substrait_plan: None,
bound_record_batch: RefCell::new(None),
ingest_target_table: None,
})
}

Expand Down Expand Up @@ -707,39 +789,81 @@ pub struct DataFusionStatement {
ctx: Arc<SessionContext>,
sql_query: Option<String>,
substrait_plan: Option<Plan>,
bound_record_batch: RefCell<Option<RecordBatch>>,
tokoko marked this conversation as resolved.
Show resolved Hide resolved
ingest_target_table: Option<String>,
}

impl Optionable for DataFusionStatement {
type Option = OptionStatement;

fn set_option(
&mut self,
_key: Self::Option,
_value: adbc_core::options::OptionValue,
key: Self::Option,
value: adbc_core::options::OptionValue,
) -> adbc_core::error::Result<()> {
todo!()
match key.as_ref() {
constants::ADBC_INGEST_OPTION_TARGET_TABLE => match value {
OptionValue::String(value) => {
self.ingest_target_table = Some(value);
Ok(())
}
_ => Err(Error::with_message_and_status(
"IngestOptionTargetTable value must be of type String",
Status::InvalidArguments,
)),
},
_ => Err(Error::with_message_and_status(
format!("Unrecognized option: {key:?}"),
Status::NotFound,
)),
}
}

fn get_option_string(&self, _key: Self::Option) -> adbc_core::error::Result<String> {
todo!()
fn get_option_string(&self, key: Self::Option) -> adbc_core::error::Result<String> {
match key.as_ref() {
constants::ADBC_INGEST_OPTION_TARGET_TABLE => {
let target_table = self.ingest_target_table.clone();
match target_table {
Some(table) => Ok(table),
None => Err(Error::with_message_and_status(
format!("{} has not been set", key.as_ref()),
Status::NotFound,
)),
}
}
_ => Err(Error::with_message_and_status(
format!("Unrecognized option: {key:?}"),
Status::NotFound,
)),
}
}

fn get_option_bytes(&self, _key: Self::Option) -> adbc_core::error::Result<Vec<u8>> {
todo!()
fn get_option_bytes(&self, key: Self::Option) -> adbc_core::error::Result<Vec<u8>> {
Err(Error::with_message_and_status(
format!("Unrecognized option: {key:?}"),
Status::NotFound,
))
}

fn get_option_int(&self, _key: Self::Option) -> adbc_core::error::Result<i64> {
todo!()
fn get_option_int(&self, key: Self::Option) -> adbc_core::error::Result<i64> {
Err(Error::with_message_and_status(
format!("Unrecognized option: {key:?}"),
Status::NotFound,
))
}

fn get_option_double(&self, _key: Self::Option) -> adbc_core::error::Result<f64> {
todo!()
fn get_option_double(&self, key: Self::Option) -> adbc_core::error::Result<f64> {
Err(Error::with_message_and_status(
format!("Unrecognized option: {key:?}"),
Status::NotFound,
))
}
}

impl Statement for DataFusionStatement {
fn bind(&mut self, _batch: arrow_array::RecordBatch) -> adbc_core::error::Result<()> {
todo!()
fn bind(&mut self, batch: arrow_array::RecordBatch) -> adbc_core::error::Result<()> {
self.bound_record_batch.replace(Some(batch));
tokoko marked this conversation as resolved.
Show resolved Hide resolved
Ok(())
}

fn bind_stream(
Expand Down Expand Up @@ -768,13 +892,31 @@ impl Statement for DataFusionStatement {
}

fn execute_update(&mut self) -> adbc_core::error::Result<Option<i64>> {
self.runtime.block_on(async {
let _ = self
.ctx
.sql(&self.sql_query.clone().unwrap())
.await
.unwrap();
});
if self.sql_query.is_some() {
self.runtime.block_on(async {
let _ = self
.ctx
.sql(&self.sql_query.clone().unwrap())
.await
.unwrap();
});
} else if self.bound_record_batch.get_mut().is_some() {
self.runtime.block_on(async {
let batch: RecordBatch = self.bound_record_batch.replace(None).unwrap();
tokoko marked this conversation as resolved.
Show resolved Hide resolved

let table = match self.ingest_target_table.clone() {
Some(table) => table,
None => todo!(),
};

self.ctx
.read_batch(batch)
.unwrap()
.write_table(table.as_str(), DataFrameWriteOptions::new())
.await
.unwrap();
});
}

Ok(Some(0))
}
Expand Down
Loading
Loading