From a1b821f47b71861c00e98ba652a418b25ece6b56 Mon Sep 17 00:00:00 2001
From: David Li
Date: Mon, 26 Jun 2023 12:22:15 -0400
Subject: [PATCH] feat(c/driver/postgresql): implement ADBC 1.1.0 features
- ADBC_INFO_DRIVER_ADBC_VERSION
- StatementExecuteSchema (#318)
- ADBC_CONNECTION_OPTION_CURRENT_{CATALOG, DB_SCHEMA) (#319)
- ConnectionCancel/StatementCancel
- GetOption/SetOption
- Ingest modes
---
adbc.h | 6 +-
c/driver/common/utils.c | 13 +
c/driver/common/utils.h | 3 +
c/driver/flightsql/dremio_flightsql_test.cc | 3 +-
c/driver/flightsql/sqlite_flightsql_test.cc | 3 +-
c/driver/postgresql/connection.cc | 111 ++++-
c/driver/postgresql/connection.h | 17 +-
c/driver/postgresql/database.cc | 35 ++
c/driver/postgresql/database.h | 12 +
c/driver/postgresql/postgres_copy_reader.h | 8 +-
c/driver/postgresql/postgresql.cc | 398 +++++++++++++++++-
c/driver/postgresql/postgresql_test.cc | 121 ++++--
c/driver/postgresql/statement.cc | 241 ++++++++---
c/driver/postgresql/statement.h | 22 +-
c/driver/snowflake/snowflake_test.cc | 2 +-
c/driver/sqlite/sqlite.c | 291 ++++++++++++-
c/driver/sqlite/sqlite_test.cc | 20 +
c/driver_manager/CMakeLists.txt | 5 +-
c/driver_manager/adbc_driver_manager.cc | 40 +-
c/driver_manager/adbc_driver_manager_test.cc | 22 +-
c/integration/duckdb/duckdb_test.cc | 4 +-
c/validation/CMakeLists.txt | 19 +-
c/validation/adbc_validation.cc | 419 ++++++++++++++++++-
c/validation/adbc_validation.h | 86 ++--
go/adbc/drivermgr/adbc.h | 6 +-
go/adbc/drivermgr/adbc_driver_manager.cc | 40 +-
26 files changed, 1735 insertions(+), 212 deletions(-)
diff --git a/adbc.h b/adbc.h
index badfc7d65a..4c13585c08 100644
--- a/adbc.h
+++ b/adbc.h
@@ -1292,7 +1292,8 @@ AdbcStatusCode AdbcConnectionRelease(struct AdbcConnection* connection,
/// or while consuming an ArrowArrayStream returned from such.
/// Calling this function should make the other functions return
/// ADBC_STATUS_CANCELLED (from ADBC functions) or ECANCELED (from
-/// methods of ArrowArrayStream).
+/// methods of ArrowArrayStream). (It is not guaranteed to, for
+/// instance, the result set may be buffered in memory already.)
///
/// This must always be thread-safe (other operations are not).
///
@@ -1947,7 +1948,8 @@ AdbcStatusCode AdbcStatementBindStream(struct AdbcStatement* statement,
/// or while consuming an ArrowArrayStream returned from such.
/// Calling this function should make the other functions return
/// ADBC_STATUS_CANCELLED (from ADBC functions) or ECANCELED (from
-/// methods of ArrowArrayStream).
+/// methods of ArrowArrayStream). (It is not guaranteed to, for
+/// instance, the result set may be buffered in memory already.)
///
/// This must always be thread-safe (other operations are not).
///
diff --git a/c/driver/common/utils.c b/c/driver/common/utils.c
index dfac14f5e4..eb01bc18a4 100644
--- a/c/driver/common/utils.c
+++ b/c/driver/common/utils.c
@@ -244,6 +244,19 @@ AdbcStatusCode AdbcConnectionGetInfoAppendString(struct ArrowArray* array,
return ADBC_STATUS_OK;
}
+AdbcStatusCode AdbcConnectionGetInfoAppendInt(struct ArrowArray* array,
+ uint32_t info_code, int64_t info_value,
+ struct AdbcError* error) {
+ CHECK_NA(INTERNAL, ArrowArrayAppendUInt(array->children[0], info_code), error);
+ // Append to type variant
+ CHECK_NA(INTERNAL, ArrowArrayAppendInt(array->children[1]->children[2], info_value),
+ error);
+ // Append type code/offset
+ CHECK_NA(INTERNAL, ArrowArrayFinishUnionElement(array->children[1], /*type_id=*/2),
+ error);
+ return ADBC_STATUS_OK;
+}
+
AdbcStatusCode AdbcInitConnectionObjectsSchema(struct ArrowSchema* schema,
struct AdbcError* error) {
ArrowSchemaInit(schema);
diff --git a/c/driver/common/utils.h b/c/driver/common/utils.h
index 5735bb945f..381c7b05ee 100644
--- a/c/driver/common/utils.h
+++ b/c/driver/common/utils.h
@@ -117,6 +117,9 @@ AdbcStatusCode AdbcConnectionGetInfoAppendString(struct ArrowArray* array,
uint32_t info_code,
const char* info_value,
struct AdbcError* error);
+AdbcStatusCode AdbcConnectionGetInfoAppendInt(struct ArrowArray* array,
+ uint32_t info_code, int64_t info_value,
+ struct AdbcError* error);
AdbcStatusCode AdbcInitConnectionObjectsSchema(struct ArrowSchema* schema,
struct AdbcError* error);
diff --git a/c/driver/flightsql/dremio_flightsql_test.cc b/c/driver/flightsql/dremio_flightsql_test.cc
index c128bd49f3..416b8aeaf5 100644
--- a/c/driver/flightsql/dremio_flightsql_test.cc
+++ b/c/driver/flightsql/dremio_flightsql_test.cc
@@ -42,11 +42,11 @@ class DremioFlightSqlQuirks : public adbc_validation::DriverQuirks {
}
std::string BindParameter(int index) const override { return "?"; }
+ bool supports_bulk_ingest(const char* /*mode*/) const override { return false; }
bool supports_concurrent_statements() const override { return true; }
bool supports_transactions() const override { return false; }
bool supports_get_sql_info() const override { return false; }
bool supports_get_objects() const override { return true; }
- bool supports_bulk_ingest() const override { return false; }
bool supports_partitioned_data() const override { return true; }
bool supports_dynamic_parameter_binding() const override { return false; }
};
@@ -87,6 +87,7 @@ class DremioFlightSqlStatementTest : public ::testing::Test,
void SetUp() override { ASSERT_NO_FATAL_FAILURE(SetUpTest()); }
void TearDown() override { ASSERT_NO_FATAL_FAILURE(TearDownTest()); }
+ void TestResultInvalidation() { GTEST_SKIP() << "Dremio generates a CANCELLED"; }
void TestSqlIngestTableEscaping() { GTEST_SKIP() << "Table escaping not implemented"; }
protected:
diff --git a/c/driver/flightsql/sqlite_flightsql_test.cc b/c/driver/flightsql/sqlite_flightsql_test.cc
index 043261971a..7ee45a250f 100644
--- a/c/driver/flightsql/sqlite_flightsql_test.cc
+++ b/c/driver/flightsql/sqlite_flightsql_test.cc
@@ -90,6 +90,8 @@ class SqliteFlightSqlQuirks : public adbc_validation::DriverQuirks {
}
std::string BindParameter(int index) const override { return "?"; }
+
+ bool supports_bulk_ingest(const char* /*mode*/) const override { return false; }
bool supports_concurrent_statements() const override { return true; }
bool supports_transactions() const override { return false; }
bool supports_get_sql_info() const override { return true; }
@@ -113,7 +115,6 @@ class SqliteFlightSqlQuirks : public adbc_validation::DriverQuirks {
}
}
bool supports_get_objects() const override { return true; }
- bool supports_bulk_ingest() const override { return false; }
bool supports_partitioned_data() const override { return true; }
bool supports_dynamic_parameter_binding() const override { return true; }
};
diff --git a/c/driver/postgresql/connection.cc b/c/driver/postgresql/connection.cc
index 08ff9027c3..1c1b0507e5 100644
--- a/c/driver/postgresql/connection.cc
+++ b/c/driver/postgresql/connection.cc
@@ -36,8 +36,9 @@
namespace {
static const uint32_t kSupportedInfoCodes[] = {
- ADBC_INFO_VENDOR_NAME, ADBC_INFO_VENDOR_VERSION, ADBC_INFO_DRIVER_NAME,
- ADBC_INFO_DRIVER_VERSION, ADBC_INFO_DRIVER_ARROW_VERSION,
+ ADBC_INFO_VENDOR_NAME, ADBC_INFO_VENDOR_VERSION,
+ ADBC_INFO_DRIVER_NAME, ADBC_INFO_DRIVER_VERSION,
+ ADBC_INFO_DRIVER_ARROW_VERSION, ADBC_INFO_DRIVER_ADBC_VERSION,
};
static const std::unordered_map kPgTableTypes = {
@@ -114,8 +115,10 @@ class PqResultHelper {
result_ = PQexecPrepared(conn_, "", param_values_.size(), param_c_strs.data(), NULL,
NULL, 0);
- if (PQresultStatus(result_) != PGRES_TUPLES_OK) {
- SetError(error_, "[libpq] Failed to execute query: %s", PQerrorMessage(conn_));
+ ExecStatusType status = PQresultStatus(result_);
+ if (status != PGRES_TUPLES_OK && status != PGRES_COMMAND_OK) {
+ SetError(error_, "[libpq] Failed to execute query '%s': %s", query_.c_str(),
+ PQerrorMessage(conn_));
return ADBC_STATUS_IO;
}
@@ -729,6 +732,20 @@ class PqGetObjectsHelper {
namespace adbcpq {
+AdbcStatusCode PostgresConnection::Cancel(struct AdbcError* error) {
+ // > errbuf must be a char array of size errbufsize (the recommended size is
+ // > 256 bytes).
+ // https://www.postgresql.org/docs/current/libpq-cancel.html
+ char errbuf[256];
+ // > The return value is 1 if the cancel request was successfully dispatched
+ // > and 0 if not.
+ if (PQcancel(cancel_, errbuf, sizeof(errbuf)) != 1) {
+ SetError(error, "[libpq] Failed to cancel operation: %s", errbuf);
+ return ADBC_STATUS_UNKNOWN;
+ }
+ return ADBC_STATUS_OK;
+}
+
AdbcStatusCode PostgresConnection::Commit(struct AdbcError* error) {
if (autocommit_) {
SetError(error, "%s", "[libpq] Cannot commit when autocommit is enabled");
@@ -776,6 +793,10 @@ AdbcStatusCode PostgresConnectionGetInfoImpl(const uint32_t* info_codes,
RAISE_ADBC(AdbcConnectionGetInfoAppendString(array, info_codes[i],
NANOARROW_VERSION, error));
break;
+ case ADBC_INFO_DRIVER_ADBC_VERSION:
+ RAISE_ADBC(AdbcConnectionGetInfoAppendInt(array, info_codes[i],
+ ADBC_VERSION_1_1_0, error));
+ break;
default:
// Ignore
continue;
@@ -840,6 +861,47 @@ AdbcStatusCode PostgresConnection::GetObjects(
return BatchToArrayStream(&array, &schema, out, error);
}
+AdbcStatusCode PostgresConnection::GetOption(const char* option, char* value,
+ size_t* length, struct AdbcError* error) {
+ std::string output;
+ if (std::strcmp(option, ADBC_CONNECTION_OPTION_CURRENT_CATALOG) == 0) {
+ output = PQdb(conn_);
+ } else if (std::strcmp(option, ADBC_CONNECTION_OPTION_CURRENT_DB_SCHEMA) == 0) {
+ PqResultHelper result_helper{conn_, "SELECT CURRENT_SCHEMA", {}, error};
+ RAISE_ADBC(result_helper.Prepare());
+ RAISE_ADBC(result_helper.Execute());
+ auto it = result_helper.begin();
+ if (it == result_helper.end()) {
+ SetError(error, "[libpq] PostgreSQL returned no rows for 'SELECT CURRENT_SCHEMA'");
+ return ADBC_STATUS_INTERNAL;
+ }
+ output = (*it)[0].data;
+ } else if (std::strcmp(option, ADBC_CONNECTION_OPTION_AUTOCOMMIT) == 0) {
+ output = autocommit_ ? ADBC_OPTION_VALUE_ENABLED : ADBC_OPTION_VALUE_DISABLED;
+ } else {
+ return ADBC_STATUS_NOT_FOUND;
+ }
+
+ if (output.size() + 1 <= *length) {
+ std::memcpy(value, output.c_str(), output.size() + 1);
+ }
+ *length = output.size() + 1;
+ return ADBC_STATUS_OK;
+}
+AdbcStatusCode PostgresConnection::GetOptionBytes(const char* option, uint8_t* value,
+ size_t* length,
+ struct AdbcError* error) {
+ return ADBC_STATUS_NOT_FOUND;
+}
+AdbcStatusCode PostgresConnection::GetOptionInt(const char* option, int64_t* value,
+ struct AdbcError* error) {
+ return ADBC_STATUS_NOT_FOUND;
+}
+AdbcStatusCode PostgresConnection::GetOptionDouble(const char* option, double* value,
+ struct AdbcError* error) {
+ return ADBC_STATUS_NOT_FOUND;
+}
+
AdbcStatusCode PostgresConnection::GetTableSchema(const char* catalog,
const char* db_schema,
const char* table_name,
@@ -964,16 +1026,26 @@ AdbcStatusCode PostgresConnection::GetTableTypes(struct AdbcConnection* connecti
AdbcStatusCode PostgresConnection::Init(struct AdbcDatabase* database,
struct AdbcError* error) {
if (!database || !database->private_data) {
- SetError(error, "%s", "[libpq] Must provide an initialized AdbcDatabase");
+ SetError(error, "[libpq] Must provide an initialized AdbcDatabase");
return ADBC_STATUS_INVALID_ARGUMENT;
}
database_ =
*reinterpret_cast*>(database->private_data);
type_resolver_ = database_->type_resolver();
- return database_->Connect(&conn_, error);
+ RAISE_ADBC(database_->Connect(&conn_, error));
+ cancel_ = PQgetCancel(conn_);
+ if (!cancel_) {
+ SetError(error, "[libpq] Could not initialize PGcancel");
+ return ADBC_STATUS_UNKNOWN;
+ }
+ return ADBC_STATUS_OK;
}
AdbcStatusCode PostgresConnection::Release(struct AdbcError* error) {
+ if (cancel_) {
+ PQfreeCancel(cancel_);
+ cancel_ = nullptr;
+ }
if (conn_) {
return database_->Disconnect(&conn_, error);
}
@@ -1023,8 +1095,35 @@ AdbcStatusCode PostgresConnection::SetOption(const char* key, const char* value,
autocommit_ = autocommit;
}
return ADBC_STATUS_OK;
+ } else if (std::strcmp(key, ADBC_CONNECTION_OPTION_CURRENT_DB_SCHEMA) == 0) {
+ // PostgreSQL doesn't accept a parameter here
+ PqResultHelper result_helper{
+ conn_, std::string("SET search_path TO ") + value, {}, error};
+ RAISE_ADBC(result_helper.Prepare());
+ RAISE_ADBC(result_helper.Execute());
+ return ADBC_STATUS_OK;
}
SetError(error, "%s%s", "[libpq] Unknown option ", key);
return ADBC_STATUS_NOT_IMPLEMENTED;
}
+
+AdbcStatusCode PostgresConnection::SetOptionBytes(const char* key, const uint8_t* value,
+ size_t length,
+ struct AdbcError* error) {
+ SetError(error, "%s%s", "[libpq] Unknown option ", key);
+ return ADBC_STATUS_NOT_IMPLEMENTED;
+}
+
+AdbcStatusCode PostgresConnection::SetOptionDouble(const char* key, double value,
+ struct AdbcError* error) {
+ SetError(error, "%s%s", "[libpq] Unknown option ", key);
+ return ADBC_STATUS_NOT_IMPLEMENTED;
+}
+
+AdbcStatusCode PostgresConnection::SetOptionInt(const char* key, int64_t value,
+ struct AdbcError* error) {
+ SetError(error, "%s%s", "[libpq] Unknown option ", key);
+ return ADBC_STATUS_NOT_IMPLEMENTED;
+}
+
} // namespace adbcpq
diff --git a/c/driver/postgresql/connection.h b/c/driver/postgresql/connection.h
index 74315ee053..a990056424 100644
--- a/c/driver/postgresql/connection.h
+++ b/c/driver/postgresql/connection.h
@@ -29,8 +29,10 @@ namespace adbcpq {
class PostgresDatabase;
class PostgresConnection {
public:
- PostgresConnection() : database_(nullptr), conn_(nullptr), autocommit_(true) {}
+ PostgresConnection()
+ : database_(nullptr), conn_(nullptr), cancel_(nullptr), autocommit_(true) {}
+ AdbcStatusCode Cancel(struct AdbcError* error);
AdbcStatusCode Commit(struct AdbcError* error);
AdbcStatusCode GetInfo(struct AdbcConnection* connection, uint32_t* info_codes,
size_t info_codes_length, struct ArrowArrayStream* out,
@@ -40,6 +42,14 @@ class PostgresConnection {
const char* table_name, const char** table_types,
const char* column_name, struct ArrowArrayStream* out,
struct AdbcError* error);
+ AdbcStatusCode GetOption(const char* option, char* value, size_t* length,
+ struct AdbcError* error);
+ AdbcStatusCode GetOptionBytes(const char* option, uint8_t* value, size_t* length,
+ struct AdbcError* error);
+ AdbcStatusCode GetOptionDouble(const char* option, double* value,
+ struct AdbcError* error);
+ AdbcStatusCode GetOptionInt(const char* option, int64_t* value,
+ struct AdbcError* error);
AdbcStatusCode GetTableSchema(const char* catalog, const char* db_schema,
const char* table_name, struct ArrowSchema* schema,
struct AdbcError* error);
@@ -49,6 +59,10 @@ class PostgresConnection {
AdbcStatusCode Release(struct AdbcError* error);
AdbcStatusCode Rollback(struct AdbcError* error);
AdbcStatusCode SetOption(const char* key, const char* value, struct AdbcError* error);
+ AdbcStatusCode SetOptionBytes(const char* key, const uint8_t* value, size_t length,
+ struct AdbcError* error);
+ AdbcStatusCode SetOptionDouble(const char* key, double value, struct AdbcError* error);
+ AdbcStatusCode SetOptionInt(const char* key, int64_t value, struct AdbcError* error);
PGconn* conn() const { return conn_; }
const std::shared_ptr& type_resolver() const {
@@ -60,6 +74,7 @@ class PostgresConnection {
std::shared_ptr database_;
std::shared_ptr type_resolver_;
PGconn* conn_;
+ PGcancel* cancel_;
bool autocommit_;
};
} // namespace adbcpq
diff --git a/c/driver/postgresql/database.cc b/c/driver/postgresql/database.cc
index 3976c4b08d..2e18b5bf30 100644
--- a/c/driver/postgresql/database.cc
+++ b/c/driver/postgresql/database.cc
@@ -36,6 +36,23 @@ PostgresDatabase::PostgresDatabase() : open_connections_(0) {
}
PostgresDatabase::~PostgresDatabase() = default;
+AdbcStatusCode PostgresDatabase::GetOption(const char* option, char* value,
+ size_t* length, struct AdbcError* error) {
+ return ADBC_STATUS_NOT_FOUND;
+}
+AdbcStatusCode PostgresDatabase::GetOptionBytes(const char* option, uint8_t* value,
+ size_t* length, struct AdbcError* error) {
+ return ADBC_STATUS_NOT_FOUND;
+}
+AdbcStatusCode PostgresDatabase::GetOptionInt(const char* option, int64_t* value,
+ struct AdbcError* error) {
+ return ADBC_STATUS_NOT_FOUND;
+}
+AdbcStatusCode PostgresDatabase::GetOptionDouble(const char* option, double* value,
+ struct AdbcError* error) {
+ return ADBC_STATUS_NOT_FOUND;
+}
+
AdbcStatusCode PostgresDatabase::Init(struct AdbcError* error) {
// Connect to validate the parameters.
return RebuildTypeResolver(error);
@@ -61,6 +78,24 @@ AdbcStatusCode PostgresDatabase::SetOption(const char* key, const char* value,
return ADBC_STATUS_OK;
}
+AdbcStatusCode PostgresDatabase::SetOptionBytes(const char* key, const uint8_t* value,
+ size_t length, struct AdbcError* error) {
+ SetError(error, "%s%s", "[libpq] Unknown option ", key);
+ return ADBC_STATUS_NOT_IMPLEMENTED;
+}
+
+AdbcStatusCode PostgresDatabase::SetOptionDouble(const char* key, double value,
+ struct AdbcError* error) {
+ SetError(error, "%s%s", "[libpq] Unknown option ", key);
+ return ADBC_STATUS_NOT_IMPLEMENTED;
+}
+
+AdbcStatusCode PostgresDatabase::SetOptionInt(const char* key, int64_t value,
+ struct AdbcError* error) {
+ SetError(error, "%s%s", "[libpq] Unknown option ", key);
+ return ADBC_STATUS_NOT_IMPLEMENTED;
+}
+
AdbcStatusCode PostgresDatabase::Connect(PGconn** conn, struct AdbcError* error) {
if (uri_.empty()) {
SetError(error, "%s",
diff --git a/c/driver/postgresql/database.h b/c/driver/postgresql/database.h
index f10464787a..9269b1958b 100644
--- a/c/driver/postgresql/database.h
+++ b/c/driver/postgresql/database.h
@@ -36,7 +36,19 @@ class PostgresDatabase {
AdbcStatusCode Init(struct AdbcError* error);
AdbcStatusCode Release(struct AdbcError* error);
+ AdbcStatusCode GetOption(const char* option, char* value, size_t* length,
+ struct AdbcError* error);
+ AdbcStatusCode GetOptionBytes(const char* option, uint8_t* value, size_t* length,
+ struct AdbcError* error);
+ AdbcStatusCode GetOptionDouble(const char* option, double* value,
+ struct AdbcError* error);
+ AdbcStatusCode GetOptionInt(const char* option, int64_t* value,
+ struct AdbcError* error);
AdbcStatusCode SetOption(const char* key, const char* value, struct AdbcError* error);
+ AdbcStatusCode SetOptionBytes(const char* key, const uint8_t* value, size_t length,
+ struct AdbcError* error);
+ AdbcStatusCode SetOptionDouble(const char* key, double value, struct AdbcError* error);
+ AdbcStatusCode SetOptionInt(const char* key, int64_t value, struct AdbcError* error);
// Internal implementation
diff --git a/c/driver/postgresql/postgres_copy_reader.h b/c/driver/postgresql/postgres_copy_reader.h
index 7d3844f86b..75aa5db7ed 100644
--- a/c/driver/postgresql/postgres_copy_reader.h
+++ b/c/driver/postgresql/postgres_copy_reader.h
@@ -843,12 +843,13 @@ static inline ArrowErrorCode MakeCopyFieldReader(const PostgresType& pg_type,
class PostgresCopyStreamReader {
public:
- ArrowErrorCode Init(const PostgresType& pg_type) {
+ ArrowErrorCode Init(PostgresType pg_type) {
if (pg_type.type_id() != PostgresTypeId::kRecord) {
return EINVAL;
}
- root_reader_.Init(pg_type);
+ pg_type_ = std::move(pg_type);
+ root_reader_.Init(pg_type_);
array_size_approx_bytes_ = 0;
return NANOARROW_OK;
}
@@ -972,7 +973,10 @@ class PostgresCopyStreamReader {
return NANOARROW_OK;
}
+ const PostgresType& pg_type() const { return pg_type_; }
+
private:
+ PostgresType pg_type_;
PostgresCopyFieldTupleReader root_reader_;
nanoarrow::UniqueSchema schema_;
nanoarrow::UniqueArray array_;
diff --git a/c/driver/postgresql/postgresql.cc b/c/driver/postgresql/postgresql.cc
index 95e6c8b881..9f6730eb4f 100644
--- a/c/driver/postgresql/postgresql.cc
+++ b/c/driver/postgresql/postgresql.cc
@@ -83,14 +83,92 @@ AdbcStatusCode PostgresDatabaseRelease(struct AdbcDatabase* database,
return status;
}
+AdbcStatusCode PostgresDatabaseGetOption(struct AdbcDatabase* database, const char* key,
+ char* value, size_t* length,
+ struct AdbcError* error) {
+ if (!database->private_data) return ADBC_STATUS_INVALID_STATE;
+ auto ptr = reinterpret_cast*>(database->private_data);
+ return (*ptr)->GetOption(key, value, length, error);
+}
+
+AdbcStatusCode PostgresDatabaseGetOptionBytes(struct AdbcDatabase* database,
+ const char* key, uint8_t* value,
+ size_t* length, struct AdbcError* error) {
+ if (!database->private_data) return ADBC_STATUS_INVALID_STATE;
+ auto ptr = reinterpret_cast*>(database->private_data);
+ return (*ptr)->GetOptionBytes(key, value, length, error);
+}
+
+AdbcStatusCode PostgresDatabaseGetOptionDouble(struct AdbcDatabase* database,
+ const char* key, double* value,
+ struct AdbcError* error) {
+ if (!database->private_data) return ADBC_STATUS_INVALID_STATE;
+ auto ptr = reinterpret_cast*>(database->private_data);
+ return (*ptr)->GetOptionDouble(key, value, error);
+}
+
+AdbcStatusCode PostgresDatabaseGetOptionInt(struct AdbcDatabase* database,
+ const char* key, int64_t* value,
+ struct AdbcError* error) {
+ if (!database->private_data) return ADBC_STATUS_INVALID_STATE;
+ auto ptr = reinterpret_cast*>(database->private_data);
+ return (*ptr)->GetOptionInt(key, value, error);
+}
+
AdbcStatusCode PostgresDatabaseSetOption(struct AdbcDatabase* database, const char* key,
const char* value, struct AdbcError* error) {
if (!database || !database->private_data) return ADBC_STATUS_INVALID_STATE;
auto ptr = reinterpret_cast*>(database->private_data);
return (*ptr)->SetOption(key, value, error);
}
+
+AdbcStatusCode PostgresDatabaseSetOptionBytes(struct AdbcDatabase* database,
+ const char* key, const uint8_t* value,
+ size_t length, struct AdbcError* error) {
+ if (!database->private_data) return ADBC_STATUS_INVALID_STATE;
+ auto ptr = reinterpret_cast*>(database->private_data);
+ return (*ptr)->SetOptionBytes(key, value, length, error);
+}
+
+AdbcStatusCode PostgresDatabaseSetOptionDouble(struct AdbcDatabase* database,
+ const char* key, double value,
+ struct AdbcError* error) {
+ if (!database->private_data) return ADBC_STATUS_INVALID_STATE;
+ auto ptr = reinterpret_cast*>(database->private_data);
+ return (*ptr)->SetOptionDouble(key, value, error);
+}
+
+AdbcStatusCode PostgresDatabaseSetOptionInt(struct AdbcDatabase* database,
+ const char* key, int64_t value,
+ struct AdbcError* error) {
+ if (!database->private_data) return ADBC_STATUS_INVALID_STATE;
+ auto ptr = reinterpret_cast*>(database->private_data);
+ return (*ptr)->SetOptionInt(key, value, error);
+}
} // namespace
+AdbcStatusCode AdbcDatabaseGetOption(struct AdbcDatabase* database, const char* key,
+ char* value, size_t* length,
+ struct AdbcError* error) {
+ return PostgresDatabaseGetOption(database, key, value, length, error);
+}
+
+AdbcStatusCode AdbcDatabaseGetOptionBytes(struct AdbcDatabase* database, const char* key,
+ uint8_t* value, size_t* length,
+ struct AdbcError* error) {
+ return PostgresDatabaseGetOptionBytes(database, key, value, length, error);
+}
+
+AdbcStatusCode AdbcDatabaseGetOptionInt(struct AdbcDatabase* database, const char* key,
+ int64_t* value, struct AdbcError* error) {
+ return PostgresDatabaseGetOptionInt(database, key, value, error);
+}
+
+AdbcStatusCode AdbcDatabaseGetOptionDouble(struct AdbcDatabase* database, const char* key,
+ double* value, struct AdbcError* error) {
+ return PostgresDatabaseGetOptionDouble(database, key, value, error);
+}
+
AdbcStatusCode AdbcDatabaseInit(struct AdbcDatabase* database, struct AdbcError* error) {
return PostgresDatabaseInit(database, error);
}
@@ -109,10 +187,34 @@ AdbcStatusCode AdbcDatabaseSetOption(struct AdbcDatabase* database, const char*
return PostgresDatabaseSetOption(database, key, value, error);
}
+AdbcStatusCode AdbcDatabaseSetOptionBytes(struct AdbcDatabase* database, const char* key,
+ const uint8_t* value, size_t length,
+ struct AdbcError* error) {
+ return PostgresDatabaseSetOptionBytes(database, key, value, length, error);
+}
+
+AdbcStatusCode AdbcDatabaseSetOptionInt(struct AdbcDatabase* database, const char* key,
+ int64_t value, struct AdbcError* error) {
+ return PostgresDatabaseSetOptionInt(database, key, value, error);
+}
+
+AdbcStatusCode AdbcDatabaseSetOptionDouble(struct AdbcDatabase* database, const char* key,
+ double value, struct AdbcError* error) {
+ return PostgresDatabaseSetOptionDouble(database, key, value, error);
+}
+
// ---------------------------------------------------------------------
// AdbcConnection
namespace {
+AdbcStatusCode PostgresConnectionCancel(struct AdbcConnection* connection,
+ struct AdbcError* error) {
+ if (!connection->private_data) return ADBC_STATUS_INVALID_STATE;
+ auto ptr =
+ reinterpret_cast*>(connection->private_data);
+ return (*ptr)->Cancel(error);
+}
+
AdbcStatusCode PostgresConnectionCommit(struct AdbcConnection* connection,
struct AdbcError* error) {
if (!connection->private_data) return ADBC_STATUS_INVALID_STATE;
@@ -142,6 +244,42 @@ AdbcStatusCode PostgresConnectionGetObjects(
table_types, column_name, stream, error);
}
+AdbcStatusCode PostgresConnectionGetOption(struct AdbcConnection* connection,
+ const char* key, char* value, size_t* length,
+ struct AdbcError* error) {
+ if (!connection->private_data) return ADBC_STATUS_INVALID_STATE;
+ auto ptr =
+ reinterpret_cast*>(connection->private_data);
+ return (*ptr)->GetOption(key, value, length, error);
+}
+
+AdbcStatusCode PostgresConnectionGetOptionBytes(struct AdbcConnection* connection,
+ const char* key, uint8_t* value,
+ size_t* length, struct AdbcError* error) {
+ if (!connection->private_data) return ADBC_STATUS_INVALID_STATE;
+ auto ptr =
+ reinterpret_cast*>(connection->private_data);
+ return (*ptr)->GetOptionBytes(key, value, length, error);
+}
+
+AdbcStatusCode PostgresConnectionGetOptionDouble(struct AdbcConnection* connection,
+ const char* key, double* value,
+ struct AdbcError* error) {
+ if (!connection->private_data) return ADBC_STATUS_INVALID_STATE;
+ auto ptr =
+ reinterpret_cast*>(connection->private_data);
+ return (*ptr)->GetOptionDouble(key, value, error);
+}
+
+AdbcStatusCode PostgresConnectionGetOptionInt(struct AdbcConnection* connection,
+ const char* key, int64_t* value,
+ struct AdbcError* error) {
+ if (!connection->private_data) return ADBC_STATUS_INVALID_STATE;
+ auto ptr =
+ reinterpret_cast*>(connection->private_data);
+ return (*ptr)->GetOptionInt(key, value, error);
+}
+
AdbcStatusCode PostgresConnectionGetTableSchema(
struct AdbcConnection* connection, const char* catalog, const char* db_schema,
const char* table_name, struct ArrowSchema* schema, struct AdbcError* error) {
@@ -213,7 +351,40 @@ AdbcStatusCode PostgresConnectionSetOption(struct AdbcConnection* connection,
return (*ptr)->SetOption(key, value, error);
}
+AdbcStatusCode PostgresConnectionSetOptionBytes(struct AdbcConnection* connection,
+ const char* key, const uint8_t* value,
+ size_t length, struct AdbcError* error) {
+ if (!connection->private_data) return ADBC_STATUS_INVALID_STATE;
+ auto ptr =
+ reinterpret_cast*>(connection->private_data);
+ return (*ptr)->SetOptionBytes(key, value, length, error);
+}
+
+AdbcStatusCode PostgresConnectionSetOptionDouble(struct AdbcConnection* connection,
+ const char* key, double value,
+ struct AdbcError* error) {
+ if (!connection->private_data) return ADBC_STATUS_INVALID_STATE;
+ auto ptr =
+ reinterpret_cast*>(connection->private_data);
+ return (*ptr)->SetOptionDouble(key, value, error);
+}
+
+AdbcStatusCode PostgresConnectionSetOptionInt(struct AdbcConnection* connection,
+ const char* key, int64_t value,
+ struct AdbcError* error) {
+ if (!connection->private_data) return ADBC_STATUS_INVALID_STATE;
+ auto ptr =
+ reinterpret_cast*>(connection->private_data);
+ return (*ptr)->SetOptionInt(key, value, error);
+}
+
} // namespace
+
+AdbcStatusCode AdbcConnectionCancel(struct AdbcConnection* connection,
+ struct AdbcError* error) {
+ return PostgresConnectionCancel(connection, error);
+}
+
AdbcStatusCode AdbcConnectionCommit(struct AdbcConnection* connection,
struct AdbcError* error) {
return PostgresConnectionCommit(connection, error);
@@ -237,6 +408,30 @@ AdbcStatusCode AdbcConnectionGetObjects(struct AdbcConnection* connection, int d
table_types, column_name, stream, error);
}
+AdbcStatusCode AdbcConnectionGetOption(struct AdbcConnection* connection, const char* key,
+ char* value, size_t* length,
+ struct AdbcError* error) {
+ return PostgresConnectionGetOption(connection, key, value, length, error);
+}
+
+AdbcStatusCode AdbcConnectionGetOptionBytes(struct AdbcConnection* connection,
+ const char* key, uint8_t* value,
+ size_t* length, struct AdbcError* error) {
+ return PostgresConnectionGetOptionBytes(connection, key, value, length, error);
+}
+
+AdbcStatusCode AdbcConnectionGetOptionInt(struct AdbcConnection* connection,
+ const char* key, int64_t* value,
+ struct AdbcError* error) {
+ return PostgresConnectionGetOptionInt(connection, key, value, error);
+}
+
+AdbcStatusCode AdbcConnectionGetOptionDouble(struct AdbcConnection* connection,
+ const char* key, double* value,
+ struct AdbcError* error) {
+ return PostgresConnectionGetOptionDouble(connection, key, value, error);
+}
+
AdbcStatusCode AdbcConnectionGetTableSchema(struct AdbcConnection* connection,
const char* catalog, const char* db_schema,
const char* table_name,
@@ -287,6 +482,24 @@ AdbcStatusCode AdbcConnectionSetOption(struct AdbcConnection* connection, const
return PostgresConnectionSetOption(connection, key, value, error);
}
+AdbcStatusCode AdbcConnectionSetOptionBytes(struct AdbcConnection* connection,
+ const char* key, const uint8_t* value,
+ size_t length, struct AdbcError* error) {
+ return PostgresConnectionSetOptionBytes(connection, key, value, length, error);
+}
+
+AdbcStatusCode AdbcConnectionSetOptionInt(struct AdbcConnection* connection,
+ const char* key, int64_t value,
+ struct AdbcError* error) {
+ return PostgresConnectionSetOptionInt(connection, key, value, error);
+}
+
+AdbcStatusCode AdbcConnectionSetOptionDouble(struct AdbcConnection* connection,
+ const char* key, double value,
+ struct AdbcError* error) {
+ return PostgresConnectionSetOptionDouble(connection, key, value, error);
+}
+
// ---------------------------------------------------------------------
// AdbcStatement
@@ -310,6 +523,14 @@ AdbcStatusCode PostgresStatementBindStream(struct AdbcStatement* statement,
return (*ptr)->Bind(stream, error);
}
+AdbcStatusCode PostgresStatementCancel(struct AdbcStatement* statement,
+ struct AdbcError* error) {
+ if (!statement->private_data) return ADBC_STATUS_INVALID_STATE;
+ auto* ptr =
+ reinterpret_cast*>(statement->private_data);
+ return (*ptr)->Cancel(error);
+}
+
AdbcStatusCode PostgresStatementExecutePartitions(struct AdbcStatement* statement,
struct ArrowSchema* schema,
struct AdbcPartitions* partitions,
@@ -329,16 +550,49 @@ AdbcStatusCode PostgresStatementExecuteQuery(struct AdbcStatement* statement,
return (*ptr)->ExecuteQuery(output, rows_affected, error);
}
-AdbcStatusCode PostgresStatementGetPartitionDesc(struct AdbcStatement* statement,
- uint8_t* partition_desc,
- struct AdbcError* error) {
- return ADBC_STATUS_NOT_IMPLEMENTED;
+AdbcStatusCode PostgresStatementExecuteSchema(struct AdbcStatement* statement,
+ struct ArrowSchema* schema,
+ struct AdbcError* error) {
+ if (!statement->private_data) return ADBC_STATUS_INVALID_STATE;
+ auto* ptr =
+ reinterpret_cast*>(statement->private_data);
+ return (*ptr)->ExecuteSchema(schema, error);
}
-AdbcStatusCode PostgresStatementGetPartitionDescSize(struct AdbcStatement* statement,
- size_t* length,
- struct AdbcError* error) {
- return ADBC_STATUS_NOT_IMPLEMENTED;
+AdbcStatusCode PostgresStatementGetOption(struct AdbcStatement* statement,
+ const char* key, char* value, size_t* length,
+ struct AdbcError* error) {
+ if (!statement->private_data) return ADBC_STATUS_INVALID_STATE;
+ auto ptr =
+ reinterpret_cast*>(statement->private_data);
+ return (*ptr)->GetOption(key, value, length, error);
+}
+
+AdbcStatusCode PostgresStatementGetOptionBytes(struct AdbcStatement* statement,
+ const char* key, uint8_t* value,
+ size_t* length, struct AdbcError* error) {
+ if (!statement->private_data) return ADBC_STATUS_INVALID_STATE;
+ auto ptr =
+ reinterpret_cast*>(statement->private_data);
+ return (*ptr)->GetOptionBytes(key, value, length, error);
+}
+
+AdbcStatusCode PostgresStatementGetOptionDouble(struct AdbcStatement* statement,
+ const char* key, double* value,
+ struct AdbcError* error) {
+ if (!statement->private_data) return ADBC_STATUS_INVALID_STATE;
+ auto ptr =
+ reinterpret_cast*>(statement->private_data);
+ return (*ptr)->GetOptionDouble(key, value, error);
+}
+
+AdbcStatusCode PostgresStatementGetOptionInt(struct AdbcStatement* statement,
+ const char* key, int64_t* value,
+ struct AdbcError* error) {
+ if (!statement->private_data) return ADBC_STATUS_INVALID_STATE;
+ auto ptr =
+ reinterpret_cast*>(statement->private_data);
+ return (*ptr)->GetOptionInt(key, value, error);
}
AdbcStatusCode PostgresStatementGetParameterSchema(struct AdbcStatement* statement,
@@ -386,6 +640,33 @@ AdbcStatusCode PostgresStatementSetOption(struct AdbcStatement* statement,
return (*ptr)->SetOption(key, value, error);
}
+AdbcStatusCode PostgresStatementSetOptionBytes(struct AdbcStatement* statement,
+ const char* key, const uint8_t* value,
+ size_t length, struct AdbcError* error) {
+ if (!statement->private_data) return ADBC_STATUS_INVALID_STATE;
+ auto ptr =
+ reinterpret_cast*>(statement->private_data);
+ return (*ptr)->SetOptionBytes(key, value, length, error);
+}
+
+AdbcStatusCode PostgresStatementSetOptionDouble(struct AdbcStatement* statement,
+ const char* key, double value,
+ struct AdbcError* error) {
+ if (!statement->private_data) return ADBC_STATUS_INVALID_STATE;
+ auto ptr =
+ reinterpret_cast*>(statement->private_data);
+ return (*ptr)->SetOptionDouble(key, value, error);
+}
+
+AdbcStatusCode PostgresStatementSetOptionInt(struct AdbcStatement* statement,
+ const char* key, int64_t value,
+ struct AdbcError* error) {
+ if (!statement->private_data) return ADBC_STATUS_INVALID_STATE;
+ auto ptr =
+ reinterpret_cast*>(statement->private_data);
+ return (*ptr)->SetOptionInt(key, value, error);
+}
+
AdbcStatusCode PostgresStatementSetSqlQuery(struct AdbcStatement* statement,
const char* query, struct AdbcError* error) {
if (!statement->private_data) return ADBC_STATUS_INVALID_STATE;
@@ -407,6 +688,11 @@ AdbcStatusCode AdbcStatementBindStream(struct AdbcStatement* statement,
return PostgresStatementBindStream(statement, stream, error);
}
+AdbcStatusCode AdbcStatementCancel(struct AdbcStatement* statement,
+ struct AdbcError* error) {
+ return PostgresStatementCancel(statement, error);
+}
+
AdbcStatusCode AdbcStatementExecutePartitions(struct AdbcStatement* statement,
ArrowSchema* schema,
struct AdbcPartitions* partitions,
@@ -423,16 +709,32 @@ AdbcStatusCode AdbcStatementExecuteQuery(struct AdbcStatement* statement,
return PostgresStatementExecuteQuery(statement, output, rows_affected, error);
}
-AdbcStatusCode AdbcStatementGetPartitionDesc(struct AdbcStatement* statement,
- uint8_t* partition_desc,
- struct AdbcError* error) {
- return PostgresStatementGetPartitionDesc(statement, partition_desc, error);
+AdbcStatusCode AdbcStatementExecuteSchema(struct AdbcStatement* statement,
+ ArrowSchema* schema, struct AdbcError* error) {
+ return PostgresStatementExecuteSchema(statement, schema, error);
}
-AdbcStatusCode AdbcStatementGetPartitionDescSize(struct AdbcStatement* statement,
- size_t* length,
- struct AdbcError* error) {
- return PostgresStatementGetPartitionDescSize(statement, length, error);
+AdbcStatusCode AdbcStatementGetOption(struct AdbcStatement* statement, const char* key,
+ char* value, size_t* length,
+ struct AdbcError* error) {
+ return PostgresStatementGetOption(statement, key, value, length, error);
+}
+
+AdbcStatusCode AdbcStatementGetOptionBytes(struct AdbcStatement* statement,
+ const char* key, uint8_t* value,
+ size_t* length, struct AdbcError* error) {
+ return PostgresStatementGetOptionBytes(statement, key, value, length, error);
+}
+
+AdbcStatusCode AdbcStatementGetOptionInt(struct AdbcStatement* statement, const char* key,
+ int64_t* value, struct AdbcError* error) {
+ return PostgresStatementGetOptionInt(statement, key, value, error);
+}
+
+AdbcStatusCode AdbcStatementGetOptionDouble(struct AdbcStatement* statement,
+ const char* key, double* value,
+ struct AdbcError* error) {
+ return PostgresStatementGetOptionDouble(statement, key, value, error);
}
AdbcStatusCode AdbcStatementGetParameterSchema(struct AdbcStatement* statement,
@@ -462,6 +764,23 @@ AdbcStatusCode AdbcStatementSetOption(struct AdbcStatement* statement, const cha
return PostgresStatementSetOption(statement, key, value, error);
}
+AdbcStatusCode AdbcStatementSetOptionBytes(struct AdbcStatement* statement,
+ const char* key, const uint8_t* value,
+ size_t length, struct AdbcError* error) {
+ return PostgresStatementSetOptionBytes(statement, key, value, length, error);
+}
+
+AdbcStatusCode AdbcStatementSetOptionInt(struct AdbcStatement* statement, const char* key,
+ int64_t value, struct AdbcError* error) {
+ return PostgresStatementSetOptionInt(statement, key, value, error);
+}
+
+AdbcStatusCode AdbcStatementSetOptionDouble(struct AdbcStatement* statement,
+ const char* key, double value,
+ struct AdbcError* error) {
+ return PostgresStatementSetOptionDouble(statement, key, value, error);
+}
+
AdbcStatusCode AdbcStatementSetSqlQuery(struct AdbcStatement* statement,
const char* query, struct AdbcError* error) {
return PostgresStatementSetSqlQuery(statement, query, error);
@@ -469,12 +788,47 @@ AdbcStatusCode AdbcStatementSetSqlQuery(struct AdbcStatement* statement,
extern "C" {
ADBC_EXPORT
-AdbcStatusCode AdbcDriverInit(int version, void* raw_driver, struct AdbcError* error) {
- if (version != ADBC_VERSION_1_0_0) return ADBC_STATUS_NOT_IMPLEMENTED;
+AdbcStatusCode PostgresqlDriverInit(int version, void* raw_driver,
+ struct AdbcError* error) {
+ if (version != ADBC_VERSION_1_0_0 && version != ADBC_VERSION_1_1_0) {
+ return ADBC_STATUS_NOT_IMPLEMENTED;
+ }
if (!raw_driver) return ADBC_STATUS_INVALID_ARGUMENT;
auto* driver = reinterpret_cast(raw_driver);
- std::memset(driver, 0, ADBC_DRIVER_1_0_0_SIZE);
+ if (version >= ADBC_VERSION_1_1_0) {
+ std::memset(driver, 0, ADBC_DRIVER_1_1_0_SIZE);
+
+ driver->DatabaseGetOption = PostgresDatabaseGetOption;
+ driver->DatabaseGetOptionBytes = PostgresDatabaseGetOptionBytes;
+ driver->DatabaseGetOptionDouble = PostgresDatabaseGetOptionDouble;
+ driver->DatabaseGetOptionInt = PostgresDatabaseGetOptionInt;
+ driver->DatabaseSetOptionBytes = PostgresDatabaseSetOptionBytes;
+ driver->DatabaseSetOptionDouble = PostgresDatabaseSetOptionDouble;
+ driver->DatabaseSetOptionInt = PostgresDatabaseSetOptionInt;
+
+ driver->ConnectionCancel = PostgresConnectionCancel;
+ driver->ConnectionGetOption = PostgresConnectionGetOption;
+ driver->ConnectionGetOptionBytes = PostgresConnectionGetOptionBytes;
+ driver->ConnectionGetOptionDouble = PostgresConnectionGetOptionDouble;
+ driver->ConnectionGetOptionInt = PostgresConnectionGetOptionInt;
+ driver->ConnectionSetOptionBytes = PostgresConnectionSetOptionBytes;
+ driver->ConnectionSetOptionDouble = PostgresConnectionSetOptionDouble;
+ driver->ConnectionSetOptionInt = PostgresConnectionSetOptionInt;
+
+ driver->StatementCancel = PostgresStatementCancel;
+ driver->StatementExecuteSchema = PostgresStatementExecuteSchema;
+ driver->StatementGetOption = PostgresStatementGetOption;
+ driver->StatementGetOptionBytes = PostgresStatementGetOptionBytes;
+ driver->StatementGetOptionDouble = PostgresStatementGetOptionDouble;
+ driver->StatementGetOptionInt = PostgresStatementGetOptionInt;
+ driver->StatementSetOptionBytes = PostgresStatementSetOptionBytes;
+ driver->StatementSetOptionDouble = PostgresStatementSetOptionDouble;
+ driver->StatementSetOptionInt = PostgresStatementSetOptionInt;
+ } else {
+ std::memset(driver, 0, ADBC_DRIVER_1_0_0_SIZE);
+ }
+
driver->DatabaseInit = PostgresDatabaseInit;
driver->DatabaseNew = PostgresDatabaseNew;
driver->DatabaseRelease = PostgresDatabaseRelease;
@@ -502,6 +856,12 @@ AdbcStatusCode AdbcDriverInit(int version, void* raw_driver, struct AdbcError* e
driver->StatementRelease = PostgresStatementRelease;
driver->StatementSetOption = PostgresStatementSetOption;
driver->StatementSetSqlQuery = PostgresStatementSetSqlQuery;
+
return ADBC_STATUS_OK;
}
+
+ADBC_EXPORT
+AdbcStatusCode AdbcDriverInit(int version, void* raw_driver, struct AdbcError* error) {
+ return PostgresqlDriverInit(version, raw_driver, error);
+}
}
diff --git a/c/driver/postgresql/postgresql_test.cc b/c/driver/postgresql/postgresql_test.cc
index 33115bcf8b..9033a22f5c 100644
--- a/c/driver/postgresql/postgresql_test.cc
+++ b/c/driver/postgresql/postgresql_test.cc
@@ -103,6 +103,29 @@ class PostgresQuirks : public adbc_validation::DriverQuirks {
std::string catalog() const override { return "postgres"; }
std::string db_schema() const override { return "public"; }
+
+ bool supports_cancel() const override { return true; }
+ bool supports_execute_schema() const override { return true; }
+ std::optional supports_get_sql_info(
+ uint32_t info_code) const override {
+ switch (info_code) {
+ case ADBC_INFO_DRIVER_ADBC_VERSION:
+ return ADBC_VERSION_1_1_0;
+ case ADBC_INFO_DRIVER_NAME:
+ return "ADBC PostgreSQL Driver";
+ case ADBC_INFO_DRIVER_VERSION:
+ return "(unknown)";
+ case ADBC_INFO_VENDOR_NAME:
+ return "PostgreSQL";
+ case ADBC_INFO_VENDOR_VERSION:
+ // Strings are checked via substring match
+ return "15";
+ default:
+ return std::nullopt;
+ }
+ }
+ bool supports_metadata_current_catalog() const override { return true; }
+ bool supports_metadata_current_db_schema() const override { return true; }
};
class PostgresDatabaseTest : public ::testing::Test,
@@ -134,10 +157,8 @@ TEST_F(PostgresConnectionTest, GetInfoMetadata) {
adbc_validation::StreamReader reader;
std::vector info = {
- ADBC_INFO_DRIVER_NAME,
- ADBC_INFO_DRIVER_VERSION,
- ADBC_INFO_VENDOR_NAME,
- ADBC_INFO_VENDOR_VERSION,
+ ADBC_INFO_DRIVER_NAME, ADBC_INFO_DRIVER_VERSION, ADBC_INFO_DRIVER_ADBC_VERSION,
+ ADBC_INFO_VENDOR_NAME, ADBC_INFO_VENDOR_VERSION,
};
ASSERT_THAT(AdbcConnectionGetInfo(&connection, info.data(), info.size(),
&reader.stream.value, &error),
@@ -153,29 +174,30 @@ TEST_F(PostgresConnectionTest, GetInfoMetadata) {
ASSERT_FALSE(ArrowArrayViewIsNull(reader.array_view->children[0], row));
const uint32_t code =
reader.array_view->children[0]->buffer_views[1].data.as_uint32[row];
+ const uint32_t offset =
+ reader.array_view->children[1]->buffer_views[1].data.as_int32[row];
seen.push_back(code);
- int str_child_index = 0;
- struct ArrowArrayView* str_child =
- reader.array_view->children[1]->children[str_child_index];
+ struct ArrowArrayView* str_child = reader.array_view->children[1]->children[0];
+ struct ArrowArrayView* int_child = reader.array_view->children[1]->children[2];
switch (code) {
case ADBC_INFO_DRIVER_NAME: {
- ArrowStringView val = ArrowArrayViewGetStringUnsafe(str_child, 0);
+ ArrowStringView val = ArrowArrayViewGetStringUnsafe(str_child, offset);
EXPECT_EQ("ADBC PostgreSQL Driver", std::string(val.data, val.size_bytes));
break;
}
case ADBC_INFO_DRIVER_VERSION: {
- ArrowStringView val = ArrowArrayViewGetStringUnsafe(str_child, 1);
+ ArrowStringView val = ArrowArrayViewGetStringUnsafe(str_child, offset);
EXPECT_EQ("(unknown)", std::string(val.data, val.size_bytes));
break;
}
case ADBC_INFO_VENDOR_NAME: {
- ArrowStringView val = ArrowArrayViewGetStringUnsafe(str_child, 2);
+ ArrowStringView val = ArrowArrayViewGetStringUnsafe(str_child, offset);
EXPECT_EQ("PostgreSQL", std::string(val.data, val.size_bytes));
break;
}
case ADBC_INFO_VENDOR_VERSION: {
- ArrowStringView val = ArrowArrayViewGetStringUnsafe(str_child, 3);
+ ArrowStringView val = ArrowArrayViewGetStringUnsafe(str_child, offset);
#ifdef __WIN32
const char* pater = "\\d\\d\\d\\d\\d\\d";
#else
@@ -185,6 +207,10 @@ TEST_F(PostgresConnectionTest, GetInfoMetadata) {
::testing::MatchesRegex(pater));
break;
}
+ case ADBC_INFO_DRIVER_ADBC_VERSION: {
+ EXPECT_EQ(ADBC_VERSION_1_1_0, ArrowArrayViewGetIntUnsafe(int_child, offset));
+ break;
+ }
default:
// Ignored
break;
@@ -198,10 +224,6 @@ TEST_F(PostgresConnectionTest, GetObjectsGetCatalogs) {
ASSERT_THAT(AdbcConnectionNew(&connection, &error), IsOkStatus(&error));
ASSERT_THAT(AdbcConnectionInit(&connection, &database, &error), IsOkStatus(&error));
- if (!quirks()->supports_get_objects()) {
- GTEST_SKIP();
- }
-
adbc_validation::StreamReader reader;
ASSERT_THAT(
AdbcConnectionGetObjects(&connection, ADBC_OBJECT_DEPTH_CATALOGS, nullptr, nullptr,
@@ -228,10 +250,6 @@ TEST_F(PostgresConnectionTest, GetObjectsGetDbSchemas) {
ASSERT_THAT(AdbcConnectionNew(&connection, &error), IsOkStatus(&error));
ASSERT_THAT(AdbcConnectionInit(&connection, &database, &error), IsOkStatus(&error));
- if (!quirks()->supports_get_objects()) {
- GTEST_SKIP();
- }
-
adbc_validation::StreamReader reader;
ASSERT_THAT(AdbcConnectionGetObjects(&connection, ADBC_OBJECT_DEPTH_DB_SCHEMAS, nullptr,
nullptr, nullptr, nullptr, nullptr,
@@ -255,10 +273,6 @@ TEST_F(PostgresConnectionTest, GetObjectsGetAllFindsPrimaryKey) {
ASSERT_THAT(AdbcConnectionNew(&connection, &error), IsOkStatus(&error));
ASSERT_THAT(AdbcConnectionInit(&connection, &database, &error), IsOkStatus(&error));
- if (!quirks()->supports_get_objects()) {
- GTEST_SKIP();
- }
-
ASSERT_THAT(quirks()->DropTable(&connection, "adbc_pkey_test", &error),
IsOkStatus(&error));
@@ -329,10 +343,6 @@ TEST_F(PostgresConnectionTest, GetObjectsGetAllFindsForeignKey) {
ASSERT_THAT(AdbcConnectionNew(&connection, &error), IsOkStatus(&error));
ASSERT_THAT(AdbcConnectionInit(&connection, &database, &error), IsOkStatus(&error));
- if (!quirks()->supports_get_objects()) {
- GTEST_SKIP();
- }
-
ASSERT_THAT(quirks()->DropTable(&connection, "adbc_fkey_test", &error),
IsOkStatus(&error));
ASSERT_THAT(quirks()->DropTable(&connection, "adbc_fkey_test_base", &error),
@@ -450,10 +460,6 @@ TEST_F(PostgresConnectionTest, GetObjectsTableTypesFilter) {
ASSERT_THAT(AdbcConnectionNew(&connection, &error), IsOkStatus(&error));
ASSERT_THAT(AdbcConnectionInit(&connection, &database, &error), IsOkStatus(&error));
- if (!quirks()->supports_get_objects()) {
- GTEST_SKIP();
- }
-
ASSERT_THAT(quirks()->DropView(&connection, "adbc_table_types_view_test", &error),
IsOkStatus(&error));
ASSERT_THAT(quirks()->DropTable(&connection, "adbc_table_types_table_test", &error),
@@ -516,7 +522,7 @@ TEST_F(PostgresConnectionTest, GetObjectsTableTypesFilter) {
}
TEST_F(PostgresConnectionTest, MetadataGetTableSchemaInjection) {
- if (!quirks()->supports_bulk_ingest()) {
+ if (!quirks()->supports_bulk_ingest(ADBC_INGEST_OPTION_MODE_CREATE)) {
GTEST_SKIP();
}
ASSERT_THAT(AdbcConnectionNew(&connection, &error), IsOkStatus(&error));
@@ -549,6 +555,57 @@ TEST_F(PostgresConnectionTest, MetadataGetTableSchemaInjection) {
{"strings", NANOARROW_TYPE_STRING, true}}));
}
+TEST_F(PostgresConnectionTest, MetadataSetCurrentDbSchema) {
+ ASSERT_THAT(AdbcConnectionNew(&connection, &error), IsOkStatus(&error));
+ ASSERT_THAT(AdbcConnectionInit(&connection, &database, &error), IsOkStatus(&error));
+
+ {
+ adbc_validation::Handle statement;
+ ASSERT_THAT(AdbcStatementNew(&connection, &statement.value, &error),
+ IsOkStatus(&error));
+
+ ASSERT_THAT(AdbcStatementSetSqlQuery(
+ &statement.value, "CREATE SCHEMA IF NOT EXISTS testschema", &error),
+ IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementExecuteQuery(&statement.value, nullptr, nullptr, &error),
+ IsOkStatus(&error));
+
+ ASSERT_THAT(
+ AdbcStatementSetSqlQuery(
+ &statement.value,
+ "CREATE TABLE IF NOT EXISTS testschema.schematable (ints INT)", &error),
+ IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementExecuteQuery(&statement.value, nullptr, nullptr, &error),
+ IsOkStatus(&error));
+
+ ASSERT_THAT(AdbcStatementRelease(&statement.value, &error), IsOkStatus(&error));
+ }
+
+ adbc_validation::Handle statement;
+ ASSERT_THAT(AdbcStatementNew(&connection, &statement.value, &error),
+ IsOkStatus(&error));
+
+ // Table does not exist in this schema
+ ASSERT_THAT(
+ AdbcStatementSetSqlQuery(&statement.value, "SELECT * FROM schematable", &error),
+ IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementExecuteQuery(&statement.value, nullptr, nullptr, &error),
+ IsStatus(ADBC_STATUS_IO, &error));
+
+ ASSERT_THAT(
+ AdbcConnectionSetOption(&connection, ADBC_CONNECTION_OPTION_CURRENT_DB_SCHEMA,
+ "testschema", &error),
+ IsOkStatus(&error));
+
+ ASSERT_THAT(
+ AdbcStatementSetSqlQuery(&statement.value, "SELECT * FROM schematable", &error),
+ IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementExecuteQuery(&statement.value, nullptr, nullptr, &error),
+ IsOkStatus(&error));
+
+ ASSERT_THAT(AdbcStatementRelease(&statement.value, &error), IsOkStatus(&error));
+}
+
ADBCV_TEST_CONNECTION(PostgresConnectionTest)
class PostgresStatementTest : public ::testing::Test,
diff --git a/c/driver/postgresql/statement.cc b/c/driver/postgresql/statement.cc
index 4cae15b631..fe14bac6ef 100644
--- a/c/driver/postgresql/statement.cc
+++ b/c/driver/postgresql/statement.cc
@@ -746,11 +746,41 @@ AdbcStatusCode PostgresStatement::Bind(struct ArrowArrayStream* stream,
return ADBC_STATUS_OK;
}
+AdbcStatusCode PostgresStatement::Cancel(struct AdbcError* error) {
+ // Ultimately the same underlying PGconn
+ return connection_->Cancel(error);
+}
+
AdbcStatusCode PostgresStatement::CreateBulkTable(
const struct ArrowSchema& source_schema,
const std::vector& source_schema_fields,
struct AdbcError* error) {
std::string create = "CREATE TABLE ";
+ switch (ingest_.mode) {
+ case IngestMode::kCreate:
+ // Nothing to do
+ break;
+ case IngestMode::kAppend:
+ return ADBC_STATUS_OK;
+ case IngestMode::kReplace: {
+ std::string drop = "DROP TABLE IF EXISTS " + ingest_.target;
+ PGresult* result = PQexecParams(connection_->conn(), drop.c_str(), /*nParams=*/0,
+ /*paramTypes=*/nullptr, /*paramValues=*/nullptr,
+ /*paramLengths=*/nullptr, /*paramFormats=*/nullptr,
+ /*resultFormat=*/1 /*(binary)*/);
+ if (PQresultStatus(result) != PGRES_COMMAND_OK) {
+ SetError(error, "[libpq] Failed to drop table: %s\nQuery was: %s",
+ PQerrorMessage(connection_->conn()), drop.c_str());
+ PQclear(result);
+ return ADBC_STATUS_IO;
+ }
+ PQclear(result);
+ break;
+ }
+ case IngestMode::kCreateAppend:
+ create += "IF NOT EXISTS ";
+ break;
+ }
create += ingest_.target;
create += " (";
@@ -867,50 +897,12 @@ AdbcStatusCode PostgresStatement::ExecuteQuery(struct ArrowArrayStream* stream,
// 1. Prepare the query to get the schema
{
- // TODO: we should pipeline here and assume this will succeed
- PGresult* result = PQprepare(connection_->conn(), /*stmtName=*/"", query_.c_str(),
- /*nParams=*/0, nullptr);
- if (PQresultStatus(result) != PGRES_COMMAND_OK) {
- SetError(error,
- "[libpq] Failed to execute query: could not infer schema: failed to "
- "prepare query: %s\nQuery was:%s",
- PQerrorMessage(connection_->conn()), query_.c_str());
- PQclear(result);
- return ADBC_STATUS_IO;
- }
- PQclear(result);
- result = PQdescribePrepared(connection_->conn(), /*stmtName=*/"");
- if (PQresultStatus(result) != PGRES_COMMAND_OK) {
- SetError(error,
- "[libpq] Failed to execute query: could not infer schema: failed to "
- "describe prepared statement: %s\nQuery was:%s",
- PQerrorMessage(connection_->conn()), query_.c_str());
- PQclear(result);
- return ADBC_STATUS_IO;
- }
-
- // Resolve the information from the PGresult into a PostgresType
- PostgresType root_type;
- AdbcStatusCode status =
- ResolvePostgresType(*type_resolver_, result, &root_type, error);
- PQclear(result);
- if (status != ADBC_STATUS_OK) return status;
-
- // Initialize the copy reader and infer the output schema (i.e., error for
- // unsupported types before issuing the COPY query)
- reader_.copy_reader_.reset(new PostgresCopyStreamReader());
- reader_.copy_reader_->Init(root_type);
- struct ArrowError na_error;
- int na_res = reader_.copy_reader_->InferOutputSchema(&na_error);
- if (na_res != NANOARROW_OK) {
- SetError(error, "[libpq] Failed to infer output schema: %s", na_error.message);
- return na_res;
- }
+ RAISE_ADBC(SetupReader(error));
// If the caller did not request a result set or if there are no
// inferred output columns (e.g. a CREATE or UPDATE), then don't
// use COPY (which would fail anyways)
- if (!stream || root_type.n_children() == 0) {
+ if (!stream || reader_.copy_reader_->pg_type().n_children() == 0) {
RAISE_ADBC(ExecuteUpdateQuery(rows_affected, error));
if (stream) {
struct ArrowSchema schema;
@@ -924,7 +916,8 @@ AdbcStatusCode PostgresStatement::ExecuteQuery(struct ArrowArrayStream* stream,
// This resolves the reader specific to each PostgresType -> ArrowSchema
// conversion. It is unlikely that this will fail given that we have just
// inferred these conversions ourselves.
- na_res = reader_.copy_reader_->InitFieldReaders(&na_error);
+ struct ArrowError na_error;
+ int na_res = reader_.copy_reader_->InitFieldReaders(&na_error);
if (na_res != NANOARROW_OK) {
SetError(error, "[libpq] Failed to initialize field readers: %s", na_error.message);
return na_res;
@@ -953,6 +946,23 @@ AdbcStatusCode PostgresStatement::ExecuteQuery(struct ArrowArrayStream* stream,
return ADBC_STATUS_OK;
}
+AdbcStatusCode PostgresStatement::ExecuteSchema(struct ArrowSchema* schema,
+ struct AdbcError* error) {
+ ClearResult();
+ if (query_.empty()) {
+ SetError(error, "%s", "[libpq] Must SetSqlQuery before ExecuteQuery");
+ return ADBC_STATUS_INVALID_STATE;
+ } else if (bind_.release) {
+ // TODO: if we have parameters, bind them (since they can affect the output schema)
+ SetError(error, "[libpq] ExecuteSchema with parameters is not implemented");
+ return ADBC_STATUS_NOT_IMPLEMENTED;
+ }
+
+ RAISE_ADBC(SetupReader(error));
+ CHECK_NA(INTERNAL, reader_.copy_reader_->GetSchema(schema), error);
+ return ADBC_STATUS_OK;
+}
+
AdbcStatusCode PostgresStatement::ExecuteUpdateBulk(int64_t* rows_affected,
struct AdbcError* error) {
if (!bind_.release) {
@@ -964,12 +974,8 @@ AdbcStatusCode PostgresStatement::ExecuteUpdateBulk(int64_t* rows_affected,
std::memset(&bind_, 0, sizeof(bind_));
RAISE_ADBC(bind_stream.Begin(
[&]() -> AdbcStatusCode {
- if (!ingest_.append) {
- // CREATE TABLE
- return CreateBulkTable(bind_stream.bind_schema.value,
- bind_stream.bind_schema_fields, error);
- }
- return ADBC_STATUS_OK;
+ return CreateBulkTable(bind_stream.bind_schema.value,
+ bind_stream.bind_schema_fields, error);
},
error));
RAISE_ADBC(bind_stream.SetParamTypes(*type_resolver_, error));
@@ -997,7 +1003,8 @@ AdbcStatusCode PostgresStatement::ExecuteUpdateQuery(int64_t* rows_affected,
PQexecPrepared(connection_->conn(), /*stmtName=*/"", /*nParams=*/0,
/*paramValues=*/nullptr, /*paramLengths=*/nullptr,
/*paramFormats=*/nullptr, /*resultFormat=*/kPgBinaryFormat);
- if (PQresultStatus(result) != PGRES_COMMAND_OK) {
+ ExecStatusType status = PQresultStatus(result);
+ if (status != PGRES_COMMAND_OK && status != PGRES_TUPLES_OK) {
SetError(error, "[libpq] Failed to execute query: %s\nQuery was:%s",
PQerrorMessage(connection_->conn()), query_.c_str());
PQclear(result);
@@ -1008,6 +1015,64 @@ AdbcStatusCode PostgresStatement::ExecuteUpdateQuery(int64_t* rows_affected,
return ADBC_STATUS_OK;
}
+AdbcStatusCode PostgresStatement::GetOption(const char* key, char* value, size_t* length,
+ struct AdbcError* error) {
+ std::string result;
+ if (std::strcmp(key, ADBC_INGEST_OPTION_TARGET_TABLE) == 0) {
+ result = ingest_.target;
+ } else if (std::strcmp(key, ADBC_INGEST_OPTION_MODE) == 0) {
+ switch (ingest_.mode) {
+ case IngestMode::kCreate:
+ result = ADBC_INGEST_OPTION_MODE_CREATE;
+ break;
+ case IngestMode::kAppend:
+ result = ADBC_INGEST_OPTION_MODE_APPEND;
+ break;
+ case IngestMode::kReplace:
+ result = ADBC_INGEST_OPTION_MODE_REPLACE;
+ break;
+ case IngestMode::kCreateAppend:
+ result = ADBC_INGEST_OPTION_MODE_CREATE_APPEND;
+ break;
+ }
+ } else if (std::strcmp(key, ADBC_POSTGRESQL_OPTION_BATCH_SIZE_HINT_BYTES) == 0) {
+ result = std::to_string(reader_.batch_size_hint_bytes_);
+ } else {
+ SetError(error, "[libq] Unknown statement option '%s'", key);
+ return ADBC_STATUS_NOT_FOUND;
+ }
+
+ if (result.size() + 1 <= *length) {
+ std::memcpy(value, result.data(), result.size() + 1);
+ }
+ *length = static_cast(result.size() + 1);
+ return ADBC_STATUS_OK;
+}
+
+AdbcStatusCode PostgresStatement::GetOptionBytes(const char* key, uint8_t* value,
+ size_t* length,
+ struct AdbcError* error) {
+ SetError(error, "[libq] Unknown statement option '%s'", key);
+ return ADBC_STATUS_NOT_FOUND;
+}
+
+AdbcStatusCode PostgresStatement::GetOptionDouble(const char* key, double* value,
+ struct AdbcError* error) {
+ SetError(error, "[libq] Unknown statement option '%s'", key);
+ return ADBC_STATUS_NOT_FOUND;
+}
+
+AdbcStatusCode PostgresStatement::GetOptionInt(const char* key, int64_t* value,
+ struct AdbcError* error) {
+ std::string result;
+ if (std::strcmp(key, ADBC_POSTGRESQL_OPTION_BATCH_SIZE_HINT_BYTES) == 0) {
+ *value = reader_.batch_size_hint_bytes_;
+ return ADBC_STATUS_OK;
+ }
+ SetError(error, "[libq] Unknown statement option '%s'", key);
+ return ADBC_STATUS_NOT_FOUND;
+}
+
AdbcStatusCode PostgresStatement::GetParameterSchema(struct ArrowSchema* schema,
struct AdbcError* error) {
return ADBC_STATUS_NOT_IMPLEMENTED;
@@ -1048,14 +1113,18 @@ AdbcStatusCode PostgresStatement::SetOption(const char* key, const char* value,
ingest_.target = value;
} else if (std::strcmp(key, ADBC_INGEST_OPTION_MODE) == 0) {
if (std::strcmp(value, ADBC_INGEST_OPTION_MODE_CREATE) == 0) {
- ingest_.append = false;
+ ingest_.mode = IngestMode::kCreate;
} else if (std::strcmp(value, ADBC_INGEST_OPTION_MODE_APPEND) == 0) {
- ingest_.append = true;
+ ingest_.mode = IngestMode::kAppend;
+ } else if (std::strcmp(value, ADBC_INGEST_OPTION_MODE_REPLACE) == 0) {
+ ingest_.mode = IngestMode::kReplace;
+ } else if (std::strcmp(value, ADBC_INGEST_OPTION_MODE_CREATE_APPEND) == 0) {
+ ingest_.mode = IngestMode::kCreateAppend;
} else {
SetError(error, "[libpq] Invalid value '%s' for option '%s'", value, key);
return ADBC_STATUS_INVALID_ARGUMENT;
}
- } else if (std::strcmp(value, ADBC_POSTGRESQL_OPTION_BATCH_SIZE_HINT_BYTES)) {
+ } else if (std::strcmp(key, ADBC_POSTGRESQL_OPTION_BATCH_SIZE_HINT_BYTES) == 0) {
int64_t int_value = std::atol(value);
if (int_value <= 0) {
SetError(error, "[libpq] Invalid value '%s' for option '%s'", value, key);
@@ -1070,6 +1139,76 @@ AdbcStatusCode PostgresStatement::SetOption(const char* key, const char* value,
return ADBC_STATUS_OK;
}
+AdbcStatusCode PostgresStatement::SetOptionBytes(const char* key, const uint8_t* value,
+ size_t length, struct AdbcError* error) {
+ SetError(error, "%s%s", "[libpq] Unknown option ", key);
+ return ADBC_STATUS_NOT_IMPLEMENTED;
+}
+
+AdbcStatusCode PostgresStatement::SetOptionDouble(const char* key, double value,
+ struct AdbcError* error) {
+ SetError(error, "%s%s", "[libpq] Unknown option ", key);
+ return ADBC_STATUS_NOT_IMPLEMENTED;
+}
+
+AdbcStatusCode PostgresStatement::SetOptionInt(const char* key, int64_t value,
+ struct AdbcError* error) {
+ if (std::strcmp(key, ADBC_POSTGRESQL_OPTION_BATCH_SIZE_HINT_BYTES) == 0) {
+ if (value <= 0) {
+ SetError(error, "[libpq] Invalid value '%" PRIi64 "' for option '%s'", value, key);
+ return ADBC_STATUS_INVALID_ARGUMENT;
+ }
+
+ this->reader_.batch_size_hint_bytes_ = value;
+ return ADBC_STATUS_OK;
+ }
+ SetError(error, "%s%s", "[libpq] Unknown option ", key);
+ return ADBC_STATUS_NOT_IMPLEMENTED;
+}
+
+AdbcStatusCode PostgresStatement::SetupReader(struct AdbcError* error) {
+ // TODO: we should pipeline here and assume this will succeed
+ PGresult* result = PQprepare(connection_->conn(), /*stmtName=*/"", query_.c_str(),
+ /*nParams=*/0, nullptr);
+ if (PQresultStatus(result) != PGRES_COMMAND_OK) {
+ SetError(error,
+ "[libpq] Failed to execute query: could not infer schema: failed to "
+ "prepare query: %s\nQuery was:%s",
+ PQerrorMessage(connection_->conn()), query_.c_str());
+ PQclear(result);
+ return ADBC_STATUS_IO;
+ }
+ PQclear(result);
+ result = PQdescribePrepared(connection_->conn(), /*stmtName=*/"");
+ if (PQresultStatus(result) != PGRES_COMMAND_OK) {
+ SetError(error,
+ "[libpq] Failed to execute query: could not infer schema: failed to "
+ "describe prepared statement: %s\nQuery was:%s",
+ PQerrorMessage(connection_->conn()), query_.c_str());
+ PQclear(result);
+ return ADBC_STATUS_IO;
+ }
+
+ // Resolve the information from the PGresult into a PostgresType
+ PostgresType root_type;
+ AdbcStatusCode status = ResolvePostgresType(*type_resolver_, result, &root_type, error);
+ PQclear(result);
+ if (status != ADBC_STATUS_OK) return status;
+
+ // Initialize the copy reader and infer the output schema (i.e., error for
+ // unsupported types before issuing the COPY query)
+ reader_.copy_reader_.reset(new PostgresCopyStreamReader());
+ reader_.copy_reader_->Init(root_type);
+ struct ArrowError na_error;
+ int na_res = reader_.copy_reader_->InferOutputSchema(&na_error);
+ if (na_res != NANOARROW_OK) {
+ SetError(error, "[libpq] Failed to infer output schema: (%d) %s: %s", na_res,
+ std::strerror(na_res), na_error.message);
+ return ADBC_STATUS_INTERNAL;
+ }
+ return ADBC_STATUS_OK;
+}
+
void PostgresStatement::ClearResult() {
// TODO: we may want to synchronize here for safety
reader_.Release();
diff --git a/c/driver/postgresql/statement.h b/c/driver/postgresql/statement.h
index 62af2457d5..eca3997153 100644
--- a/c/driver/postgresql/statement.h
+++ b/c/driver/postgresql/statement.h
@@ -100,13 +100,25 @@ class PostgresStatement {
AdbcStatusCode Bind(struct ArrowArray* values, struct ArrowSchema* schema,
struct AdbcError* error);
AdbcStatusCode Bind(struct ArrowArrayStream* stream, struct AdbcError* error);
+ AdbcStatusCode Cancel(struct AdbcError* error);
AdbcStatusCode ExecuteQuery(struct ArrowArrayStream* stream, int64_t* rows_affected,
struct AdbcError* error);
+ AdbcStatusCode ExecuteSchema(struct ArrowSchema* schema, struct AdbcError* error);
+ AdbcStatusCode GetOption(const char* key, char* value, size_t* length,
+ struct AdbcError* error);
+ AdbcStatusCode GetOptionBytes(const char* key, uint8_t* value, size_t* length,
+ struct AdbcError* error);
+ AdbcStatusCode GetOptionDouble(const char* key, double* value, struct AdbcError* error);
+ AdbcStatusCode GetOptionInt(const char* key, int64_t* value, struct AdbcError* error);
AdbcStatusCode GetParameterSchema(struct ArrowSchema* schema, struct AdbcError* error);
AdbcStatusCode New(struct AdbcConnection* connection, struct AdbcError* error);
AdbcStatusCode Prepare(struct AdbcError* error);
AdbcStatusCode Release(struct AdbcError* error);
AdbcStatusCode SetOption(const char* key, const char* value, struct AdbcError* error);
+ AdbcStatusCode SetOptionBytes(const char* key, const uint8_t* value, size_t length,
+ struct AdbcError* error);
+ AdbcStatusCode SetOptionDouble(const char* key, double value, struct AdbcError* error);
+ AdbcStatusCode SetOptionInt(const char* key, int64_t value, struct AdbcError* error);
AdbcStatusCode SetSqlQuery(const char* query, struct AdbcError* error);
// ---------------------------------------------------------------------
@@ -122,6 +134,7 @@ class PostgresStatement {
AdbcStatusCode ExecutePreparedStatement(struct ArrowArrayStream* stream,
int64_t* rows_affected,
struct AdbcError* error);
+ AdbcStatusCode SetupReader(struct AdbcError* error);
private:
std::shared_ptr type_resolver_;
@@ -133,9 +146,16 @@ class PostgresStatement {
struct ArrowArrayStream bind_;
// Bulk ingest state
+ enum class IngestMode {
+ kCreate,
+ kAppend,
+ kReplace,
+ kCreateAppend,
+ };
+
struct {
std::string target;
- bool append = false;
+ IngestMode mode = IngestMode::kCreate;
} ingest_;
TupleReader reader_;
diff --git a/c/driver/snowflake/snowflake_test.cc b/c/driver/snowflake/snowflake_test.cc
index c8e08a568f..b3833d78c7 100644
--- a/c/driver/snowflake/snowflake_test.cc
+++ b/c/driver/snowflake/snowflake_test.cc
@@ -106,11 +106,11 @@ class SnowflakeQuirks : public adbc_validation::DriverQuirks {
}
std::string BindParameter(int index) const override { return "?"; }
+ bool supports_bulk_ingest(const char* /*mode*/) const override { return true; }
bool supports_concurrent_statements() const override { return true; }
bool supports_transactions() const override { return true; }
bool supports_get_sql_info() const override { return false; }
bool supports_get_objects() const override { return true; }
- bool supports_bulk_ingest() const override { return true; }
bool supports_partitioned_data() const override { return false; }
bool supports_dynamic_parameter_binding() const override { return false; }
bool ddl_implicit_commit_txn() const override { return true; }
diff --git a/c/driver/sqlite/sqlite.c b/c/driver/sqlite/sqlite.c
index 80da86746a..6d2f344562 100644
--- a/c/driver/sqlite/sqlite.c
+++ b/c/driver/sqlite/sqlite.c
@@ -86,6 +86,26 @@ AdbcStatusCode SqliteDatabaseSetOption(struct AdbcDatabase* database, const char
return ADBC_STATUS_NOT_IMPLEMENTED;
}
+AdbcStatusCode SqliteDatabaseSetOptionBytes(struct AdbcDatabase* database,
+ const char* key, const uint8_t* value,
+ size_t length, struct AdbcError* error) {
+ CHECK_DB_INIT(database, error);
+ return ADBC_STATUS_NOT_IMPLEMENTED;
+}
+
+AdbcStatusCode SqliteDatabaseSetOptionDouble(struct AdbcDatabase* database,
+ const char* key, double value,
+ struct AdbcError* error) {
+ CHECK_DB_INIT(database, error);
+ return ADBC_STATUS_NOT_IMPLEMENTED;
+}
+
+AdbcStatusCode SqliteDatabaseSetOptionInt(struct AdbcDatabase* database, const char* key,
+ int64_t value, struct AdbcError* error) {
+ CHECK_DB_INIT(database, error);
+ return ADBC_STATUS_NOT_IMPLEMENTED;
+}
+
int OpenDatabase(const char* maybe_uri, sqlite3** db, struct AdbcError* error) {
const char* uri = maybe_uri ? maybe_uri : kDefaultUri;
int rc = sqlite3_open_v2(uri, db,
@@ -120,6 +140,33 @@ AdbcStatusCode ExecuteQuery(struct SqliteConnection* conn, const char* query,
return ADBC_STATUS_OK;
}
+AdbcStatusCode SqliteDatabaseGetOption(struct AdbcDatabase* database, const char* key,
+ char* value, size_t* length,
+ struct AdbcError* error) {
+ CHECK_DB_INIT(database, error);
+ return ADBC_STATUS_NOT_FOUND;
+}
+
+AdbcStatusCode SqliteDatabaseGetOptionBytes(struct AdbcDatabase* database,
+ const char* key, uint8_t* value,
+ size_t* length, struct AdbcError* error) {
+ CHECK_DB_INIT(database, error);
+ return ADBC_STATUS_NOT_FOUND;
+}
+
+AdbcStatusCode SqliteDatabaseGetOptionDouble(struct AdbcDatabase* database,
+ const char* key, double* value,
+ struct AdbcError* error) {
+ CHECK_DB_INIT(database, error);
+ return ADBC_STATUS_NOT_FOUND;
+}
+
+AdbcStatusCode SqliteDatabaseGetOptionInt(struct AdbcDatabase* database, const char* key,
+ int64_t* value, struct AdbcError* error) {
+ CHECK_DB_INIT(database, error);
+ return ADBC_STATUS_NOT_FOUND;
+}
+
AdbcStatusCode SqliteDatabaseInit(struct AdbcDatabase* database,
struct AdbcError* error) {
CHECK_DB_INIT(database, error);
@@ -204,6 +251,27 @@ AdbcStatusCode SqliteConnectionSetOption(struct AdbcConnection* connection,
return ADBC_STATUS_NOT_IMPLEMENTED;
}
+AdbcStatusCode SqliteConnectionSetOptionBytes(struct AdbcConnection* connection,
+ const char* key, const uint8_t* value,
+ size_t length, struct AdbcError* error) {
+ CHECK_DB_INIT(connection, error);
+ return ADBC_STATUS_NOT_IMPLEMENTED;
+}
+
+AdbcStatusCode SqliteConnectionSetOptionDouble(struct AdbcConnection* connection,
+ const char* key, double value,
+ struct AdbcError* error) {
+ CHECK_DB_INIT(connection, error);
+ return ADBC_STATUS_NOT_IMPLEMENTED;
+}
+
+AdbcStatusCode SqliteConnectionSetOptionInt(struct AdbcConnection* connection,
+ const char* key, int64_t value,
+ struct AdbcError* error) {
+ CHECK_DB_INIT(connection, error);
+ return ADBC_STATUS_NOT_IMPLEMENTED;
+}
+
AdbcStatusCode SqliteConnectionInit(struct AdbcConnection* connection,
struct AdbcDatabase* database,
struct AdbcError* error) {
@@ -754,6 +822,34 @@ AdbcStatusCode SqliteConnectionGetObjects(struct AdbcConnection* connection, int
return BatchToArrayStream(&array, &schema, out, error);
}
+AdbcStatusCode SqliteConnectionGetOption(struct AdbcConnection* connection,
+ const char* key, char* value, size_t* length,
+ struct AdbcError* error) {
+ CHECK_DB_INIT(connection, error);
+ return ADBC_STATUS_NOT_FOUND;
+}
+
+AdbcStatusCode SqliteConnectionGetOptionBytes(struct AdbcConnection* connection,
+ const char* key, uint8_t* value,
+ size_t* length, struct AdbcError* error) {
+ CHECK_DB_INIT(connection, error);
+ return ADBC_STATUS_NOT_FOUND;
+}
+
+AdbcStatusCode SqliteConnectionGetOptionDouble(struct AdbcConnection* connection,
+ const char* key, double* value,
+ struct AdbcError* error) {
+ CHECK_DB_INIT(connection, error);
+ return ADBC_STATUS_NOT_FOUND;
+}
+
+AdbcStatusCode SqliteConnectionGetOptionInt(struct AdbcConnection* connection,
+ const char* key, int64_t* value,
+ struct AdbcError* error) {
+ CHECK_DB_INIT(connection, error);
+ return ADBC_STATUS_NOT_FOUND;
+}
+
AdbcStatusCode SqliteConnectionGetTableSchema(struct AdbcConnection* connection,
const char* catalog, const char* db_schema,
const char* table_name,
@@ -1243,6 +1339,34 @@ AdbcStatusCode SqliteStatementBindStream(struct AdbcStatement* statement,
return AdbcSqliteBinderSetArrayStream(&stmt->binder, stream, error);
}
+AdbcStatusCode SqliteStatementGetOption(struct AdbcStatement* statement, const char* key,
+ char* value, size_t* length,
+ struct AdbcError* error) {
+ CHECK_DB_INIT(statement, error);
+ return ADBC_STATUS_NOT_FOUND;
+}
+
+AdbcStatusCode SqliteStatementGetOptionBytes(struct AdbcStatement* statement,
+ const char* key, uint8_t* value,
+ size_t* length, struct AdbcError* error) {
+ CHECK_DB_INIT(statement, error);
+ return ADBC_STATUS_NOT_FOUND;
+}
+
+AdbcStatusCode SqliteStatementGetOptionDouble(struct AdbcStatement* statement,
+ const char* key, double* value,
+ struct AdbcError* error) {
+ CHECK_DB_INIT(statement, error);
+ return ADBC_STATUS_NOT_FOUND;
+}
+
+AdbcStatusCode SqliteStatementGetOptionInt(struct AdbcStatement* statement,
+ const char* key, int64_t* value,
+ struct AdbcError* error) {
+ CHECK_DB_INIT(statement, error);
+ return ADBC_STATUS_NOT_FOUND;
+}
+
AdbcStatusCode SqliteStatementGetParameterSchema(struct AdbcStatement* statement,
struct ArrowSchema* schema,
struct AdbcError* error) {
@@ -1332,6 +1456,27 @@ AdbcStatusCode SqliteStatementSetOption(struct AdbcStatement* statement, const c
return ADBC_STATUS_NOT_IMPLEMENTED;
}
+AdbcStatusCode SqliteStatementSetOptionBytes(struct AdbcStatement* statement,
+ const char* key, const uint8_t* value,
+ size_t length, struct AdbcError* error) {
+ CHECK_DB_INIT(statement, error);
+ return ADBC_STATUS_NOT_IMPLEMENTED;
+}
+
+AdbcStatusCode SqliteStatementSetOptionDouble(struct AdbcStatement* statement,
+ const char* key, double value,
+ struct AdbcError* error) {
+ CHECK_DB_INIT(statement, error);
+ return ADBC_STATUS_NOT_IMPLEMENTED;
+}
+
+AdbcStatusCode SqliteStatementSetOptionInt(struct AdbcStatement* statement,
+ const char* key, int64_t value,
+ struct AdbcError* error) {
+ CHECK_DB_INIT(statement, error);
+ return ADBC_STATUS_NOT_IMPLEMENTED;
+}
+
AdbcStatusCode SqliteStatementExecutePartitions(struct AdbcStatement* statement,
struct ArrowSchema* schema,
struct AdbcPartitions* partitions,
@@ -1382,24 +1527,91 @@ AdbcStatusCode SqliteDriverInit(int version, void* raw_driver, struct AdbcError*
// Public names
-AdbcStatusCode AdbcDatabaseNew(struct AdbcDatabase* database, struct AdbcError* error) {
- return SqliteDatabaseNew(database, error);
+AdbcStatusCode AdbcDatabaseGetOption(struct AdbcDatabase* database, const char* key,
+ char* value, size_t* length,
+ struct AdbcError* error) {
+ return SqliteDatabaseGetOption(database, key, value, length, error);
}
-AdbcStatusCode AdbcDatabaseSetOption(struct AdbcDatabase* database, const char* key,
- const char* value, struct AdbcError* error) {
- return SqliteDatabaseSetOption(database, key, value, error);
+AdbcStatusCode AdbcDatabaseGetOptionBytes(struct AdbcDatabase* database, const char* key,
+ uint8_t* value, size_t* length,
+ struct AdbcError* error) {
+ return SqliteDatabaseGetOptionBytes(database, key, value, length, error);
+}
+
+AdbcStatusCode AdbcDatabaseGetOptionInt(struct AdbcDatabase* database, const char* key,
+ int64_t* value, struct AdbcError* error) {
+ return SqliteDatabaseGetOptionInt(database, key, value, error);
+}
+
+AdbcStatusCode AdbcDatabaseGetOptionDouble(struct AdbcDatabase* database, const char* key,
+ double* value, struct AdbcError* error) {
+ return SqliteDatabaseGetOptionDouble(database, key, value, error);
}
AdbcStatusCode AdbcDatabaseInit(struct AdbcDatabase* database, struct AdbcError* error) {
return SqliteDatabaseInit(database, error);
}
+AdbcStatusCode AdbcDatabaseNew(struct AdbcDatabase* database, struct AdbcError* error) {
+ return SqliteDatabaseNew(database, error);
+}
+
AdbcStatusCode AdbcDatabaseRelease(struct AdbcDatabase* database,
struct AdbcError* error) {
return SqliteDatabaseRelease(database, error);
}
+AdbcStatusCode AdbcDatabaseSetOption(struct AdbcDatabase* database, const char* key,
+ const char* value, struct AdbcError* error) {
+ return SqliteDatabaseSetOption(database, key, value, error);
+}
+
+AdbcStatusCode AdbcDatabaseSetOptionBytes(struct AdbcDatabase* database, const char* key,
+ const uint8_t* value, size_t length,
+ struct AdbcError* error) {
+ return SqliteDatabaseSetOptionBytes(database, key, value, length, error);
+}
+
+AdbcStatusCode AdbcDatabaseSetOptionInt(struct AdbcDatabase* database, const char* key,
+ int64_t value, struct AdbcError* error) {
+ return SqliteDatabaseSetOptionInt(database, key, value, error);
+}
+
+AdbcStatusCode AdbcDatabaseSetOptionDouble(struct AdbcDatabase* database, const char* key,
+ double value, struct AdbcError* error) {
+ return SqliteDatabaseSetOptionDouble(database, key, value, error);
+}
+
+AdbcStatusCode AdbcConnectionCancel(struct AdbcConnection* connection,
+ struct AdbcError* error) {
+ return ADBC_STATUS_NOT_IMPLEMENTED;
+}
+
+AdbcStatusCode AdbcConnectionGetOption(struct AdbcConnection* connection, const char* key,
+ char* value, size_t* length,
+ struct AdbcError* error) {
+ return SqliteConnectionGetOption(connection, key, value, length, error);
+}
+
+AdbcStatusCode AdbcConnectionGetOptionBytes(struct AdbcConnection* connection,
+ const char* key, uint8_t* value,
+ size_t* length, struct AdbcError* error) {
+ return SqliteConnectionGetOptionBytes(connection, key, value, length, error);
+}
+
+AdbcStatusCode AdbcConnectionGetOptionInt(struct AdbcConnection* connection,
+ const char* key, int64_t* value,
+ struct AdbcError* error) {
+ return SqliteConnectionGetOptionInt(connection, key, value, error);
+}
+
+AdbcStatusCode AdbcConnectionGetOptionDouble(struct AdbcConnection* connection,
+ const char* key, double* value,
+ struct AdbcError* error) {
+ return SqliteConnectionGetOptionDouble(connection, key, value, error);
+}
+
AdbcStatusCode AdbcConnectionNew(struct AdbcConnection* connection,
struct AdbcError* error) {
return SqliteConnectionNew(connection, error);
@@ -1410,6 +1622,24 @@ AdbcStatusCode AdbcConnectionSetOption(struct AdbcConnection* connection, const
return SqliteConnectionSetOption(connection, key, value, error);
}
+AdbcStatusCode AdbcConnectionSetOptionBytes(struct AdbcConnection* connection,
+ const char* key, const uint8_t* value,
+ size_t length, struct AdbcError* error) {
+ return SqliteConnectionSetOptionBytes(connection, key, value, length, error);
+}
+
+AdbcStatusCode AdbcConnectionSetOptionInt(struct AdbcConnection* connection,
+ const char* key, int64_t value,
+ struct AdbcError* error) {
+ return SqliteConnectionSetOptionInt(connection, key, value, error);
+}
+
+AdbcStatusCode AdbcConnectionSetOptionDouble(struct AdbcConnection* connection,
+ const char* key, double value,
+ struct AdbcError* error) {
+ return SqliteConnectionSetOptionDouble(connection, key, value, error);
+}
+
AdbcStatusCode AdbcConnectionInit(struct AdbcConnection* connection,
struct AdbcDatabase* database,
struct AdbcError* error) {
@@ -1472,6 +1702,11 @@ AdbcStatusCode AdbcConnectionRollback(struct AdbcConnection* connection,
return SqliteConnectionRollback(connection, error);
}
+AdbcStatusCode AdbcStatementCancel(struct AdbcStatement* statement,
+ struct AdbcError* error) {
+ return ADBC_STATUS_NOT_IMPLEMENTED;
+}
+
AdbcStatusCode AdbcStatementNew(struct AdbcConnection* connection,
struct AdbcStatement* statement,
struct AdbcError* error) {
@@ -1490,6 +1725,12 @@ AdbcStatusCode AdbcStatementExecuteQuery(struct AdbcStatement* statement,
return SqliteStatementExecuteQuery(statement, out, rows_affected, error);
}
+AdbcStatusCode AdbcStatementExecuteSchema(struct AdbcStatement* statement,
+ struct ArrowSchema* schema,
+ struct AdbcError* error) {
+ return ADBC_STATUS_NOT_IMPLEMENTED;
+}
+
AdbcStatusCode AdbcStatementPrepare(struct AdbcStatement* statement,
struct AdbcError* error) {
return SqliteStatementPrepare(statement, error);
@@ -1518,6 +1759,29 @@ AdbcStatusCode AdbcStatementBindStream(struct AdbcStatement* statement,
return SqliteStatementBindStream(statement, stream, error);
}
+AdbcStatusCode AdbcStatementGetOption(struct AdbcStatement* statement, const char* key,
+ char* value, size_t* length,
+ struct AdbcError* error) {
+ return SqliteStatementGetOption(statement, key, value, length, error);
+}
+
+AdbcStatusCode AdbcStatementGetOptionBytes(struct AdbcStatement* statement,
+ const char* key, uint8_t* value,
+ size_t* length, struct AdbcError* error) {
+ return SqliteStatementGetOptionBytes(statement, key, value, length, error);
+}
+
+AdbcStatusCode AdbcStatementGetOptionInt(struct AdbcStatement* statement, const char* key,
+ int64_t* value, struct AdbcError* error) {
+ return SqliteStatementGetOptionInt(statement, key, value, error);
+}
+
+AdbcStatusCode AdbcStatementGetOptionDouble(struct AdbcStatement* statement,
+ const char* key, double* value,
+ struct AdbcError* error) {
+ return SqliteStatementGetOptionDouble(statement, key, value, error);
+}
+
AdbcStatusCode AdbcStatementGetParameterSchema(struct AdbcStatement* statement,
struct ArrowSchema* schema,
struct AdbcError* error) {
@@ -1529,6 +1793,23 @@ AdbcStatusCode AdbcStatementSetOption(struct AdbcStatement* statement, const cha
return SqliteStatementSetOption(statement, key, value, error);
}
+AdbcStatusCode AdbcStatementSetOptionBytes(struct AdbcStatement* statement,
+ const char* key, const uint8_t* value,
+ size_t length, struct AdbcError* error) {
+ return SqliteStatementSetOptionBytes(statement, key, value, length, error);
+}
+
+AdbcStatusCode AdbcStatementSetOptionInt(struct AdbcStatement* statement, const char* key,
+ int64_t value, struct AdbcError* error) {
+ return SqliteStatementSetOptionInt(statement, key, value, error);
+}
+
+AdbcStatusCode AdbcStatementSetOptionDouble(struct AdbcStatement* statement,
+ const char* key, double value,
+ struct AdbcError* error) {
+ return SqliteStatementSetOptionDouble(statement, key, value, error);
+}
+
AdbcStatusCode AdbcStatementExecutePartitions(struct AdbcStatement* statement,
struct ArrowSchema* schema,
struct AdbcPartitions* partitions,
diff --git a/c/driver/sqlite/sqlite_test.cc b/c/driver/sqlite/sqlite_test.cc
index 3ab21fe3fa..13ab64aa49 100644
--- a/c/driver/sqlite/sqlite_test.cc
+++ b/c/driver/sqlite/sqlite_test.cc
@@ -91,7 +91,27 @@ class SqliteQuirks : public adbc_validation::DriverQuirks {
return ddl;
}
+ bool supports_bulk_ingest(const char* mode) const override {
+ return std::strcmp(mode, ADBC_INGEST_OPTION_MODE_APPEND) == 0 ||
+ std::strcmp(mode, ADBC_INGEST_OPTION_MODE_CREATE) == 0;
+ }
bool supports_concurrent_statements() const override { return true; }
+ bool supports_get_option() const override { return false; }
+ std::optional supports_get_sql_info(
+ uint32_t info_code) const override {
+ switch (info_code) {
+ case ADBC_INFO_DRIVER_NAME:
+ return "ADBC SQLite Driver";
+ case ADBC_INFO_DRIVER_VERSION:
+ return "(unknown)";
+ case ADBC_INFO_VENDOR_NAME:
+ return "SQLite";
+ case ADBC_INFO_VENDOR_VERSION:
+ return "3.";
+ default:
+ return std::nullopt;
+ }
+ }
std::string catalog() const override { return "main"; }
std::string db_schema() const override { return ""; }
diff --git a/c/driver_manager/CMakeLists.txt b/c/driver_manager/CMakeLists.txt
index 637843fff5..6fb51d9a6a 100644
--- a/c/driver_manager/CMakeLists.txt
+++ b/c/driver_manager/CMakeLists.txt
@@ -55,10 +55,9 @@ if(ADBC_BUILD_TESTS)
driver-manager
SOURCES
adbc_driver_manager_test.cc
- ../validation/adbc_validation.cc
- ../validation/adbc_validation_util.cc
EXTRA_LINK_LIBS
adbc_driver_common
+ adbc_validation
nanoarrow
${TEST_LINK_LIBS})
target_compile_features(adbc-driver-manager-test PRIVATE cxx_std_17)
@@ -73,8 +72,8 @@ if(ADBC_BUILD_TESTS)
SOURCES
adbc_version_100.c
adbc_version_100_compatibility_test.cc
- ../validation/adbc_validation_util.cc
EXTRA_LINK_LIBS
+ adbc_validation_util
nanoarrow
${TEST_LINK_LIBS})
target_compile_features(adbc-version-100-compatibility-test PRIVATE cxx_std_17)
diff --git a/c/driver_manager/adbc_driver_manager.cc b/c/driver_manager/adbc_driver_manager.cc
index 1d35482ac9..ea4ddc4c55 100644
--- a/c/driver_manager/adbc_driver_manager.cc
+++ b/c/driver_manager/adbc_driver_manager.cc
@@ -132,23 +132,23 @@ static AdbcStatusCode ReleaseDriver(struct AdbcDriver* driver, struct AdbcError*
AdbcStatusCode DatabaseGetOption(struct AdbcDatabase* database, const char* key,
char* value, size_t* length, struct AdbcError* error) {
- return ADBC_STATUS_NOT_IMPLEMENTED;
+ return ADBC_STATUS_NOT_FOUND;
}
AdbcStatusCode DatabaseGetOptionBytes(struct AdbcDatabase* database, const char* key,
uint8_t* value, size_t* length,
struct AdbcError* error) {
- return ADBC_STATUS_NOT_IMPLEMENTED;
+ return ADBC_STATUS_NOT_FOUND;
}
AdbcStatusCode DatabaseGetOptionInt(struct AdbcDatabase* database, const char* key,
int64_t* value, struct AdbcError* error) {
- return ADBC_STATUS_NOT_IMPLEMENTED;
+ return ADBC_STATUS_NOT_FOUND;
}
AdbcStatusCode DatabaseGetOptionDouble(struct AdbcDatabase* database, const char* key,
double* value, struct AdbcError* error) {
- return ADBC_STATUS_NOT_IMPLEMENTED;
+ return ADBC_STATUS_NOT_FOUND;
}
AdbcStatusCode DatabaseSetOption(struct AdbcDatabase* database, const char* key,
@@ -195,24 +195,24 @@ AdbcStatusCode ConnectionGetObjects(struct AdbcConnection*, int, const char*, co
AdbcStatusCode ConnectionGetOption(struct AdbcConnection* connection, const char* key,
char* value, size_t* length, struct AdbcError* error) {
- return ADBC_STATUS_NOT_IMPLEMENTED;
+ return ADBC_STATUS_NOT_FOUND;
}
AdbcStatusCode ConnectionGetOptionBytes(struct AdbcConnection* connection,
const char* key, uint8_t* value, size_t* length,
struct AdbcError* error) {
- return ADBC_STATUS_NOT_IMPLEMENTED;
+ return ADBC_STATUS_NOT_FOUND;
}
AdbcStatusCode ConnectionGetOptionInt(struct AdbcConnection* connection, const char* key,
int64_t* value, struct AdbcError* error) {
- return ADBC_STATUS_NOT_IMPLEMENTED;
+ return ADBC_STATUS_NOT_FOUND;
}
AdbcStatusCode ConnectionGetOptionDouble(struct AdbcConnection* connection,
const char* key, double* value,
struct AdbcError* error) {
- return ADBC_STATUS_NOT_IMPLEMENTED;
+ return ADBC_STATUS_NOT_FOUND;
}
AdbcStatusCode ConnectionGetTableSchema(struct AdbcConnection*, const char*, const char*,
@@ -284,23 +284,23 @@ AdbcStatusCode StatementExecuteSchema(struct AdbcStatement* statement,
AdbcStatusCode StatementGetOption(struct AdbcStatement* statement, const char* key,
char* value, size_t* length, struct AdbcError* error) {
- return ADBC_STATUS_NOT_IMPLEMENTED;
+ return ADBC_STATUS_NOT_FOUND;
}
AdbcStatusCode StatementGetOptionBytes(struct AdbcStatement* statement, const char* key,
uint8_t* value, size_t* length,
struct AdbcError* error) {
- return ADBC_STATUS_NOT_IMPLEMENTED;
+ return ADBC_STATUS_NOT_FOUND;
}
AdbcStatusCode StatementGetOptionInt(struct AdbcStatement* statement, const char* key,
int64_t* value, struct AdbcError* error) {
- return ADBC_STATUS_NOT_IMPLEMENTED;
+ return ADBC_STATUS_NOT_FOUND;
}
AdbcStatusCode StatementGetOptionDouble(struct AdbcStatement* statement, const char* key,
double* value, struct AdbcError* error) {
- return ADBC_STATUS_NOT_IMPLEMENTED;
+ return ADBC_STATUS_NOT_FOUND;
}
AdbcStatusCode StatementGetParameterSchema(struct AdbcStatement* statement,
@@ -535,11 +535,11 @@ AdbcStatusCode AdbcDatabaseInit(struct AdbcDatabase* database, struct AdbcError*
// So we don't confuse a driver into thinking it's initialized already
database->private_data = nullptr;
if (args->init_func) {
- status = AdbcLoadDriverFromInitFunc(args->init_func, ADBC_VERSION_1_0_0,
+ status = AdbcLoadDriverFromInitFunc(args->init_func, ADBC_VERSION_1_1_0,
database->private_driver, error);
} else {
status = AdbcLoadDriver(args->driver.c_str(), args->entrypoint.c_str(),
- ADBC_VERSION_1_0_0, database->private_driver, error);
+ ADBC_VERSION_1_1_0, database->private_driver, error);
}
if (status != ADBC_STATUS_OK) {
// Restore private_data so it will be released by AdbcDatabaseRelease
@@ -1375,30 +1375,30 @@ AdbcStatusCode AdbcLoadDriverFromInitFunc(AdbcDriverInitFunc init_func, int vers
auto* driver = reinterpret_cast(raw_driver);
FILL_DEFAULT(driver, DatabaseGetOption);
FILL_DEFAULT(driver, DatabaseGetOptionBytes);
- FILL_DEFAULT(driver, DatabaseGetOptionInt);
FILL_DEFAULT(driver, DatabaseGetOptionDouble);
+ FILL_DEFAULT(driver, DatabaseGetOptionInt);
FILL_DEFAULT(driver, DatabaseSetOptionBytes);
- FILL_DEFAULT(driver, DatabaseSetOptionInt);
FILL_DEFAULT(driver, DatabaseSetOptionDouble);
+ FILL_DEFAULT(driver, DatabaseSetOptionInt);
FILL_DEFAULT(driver, ConnectionCancel);
FILL_DEFAULT(driver, ConnectionGetOption);
FILL_DEFAULT(driver, ConnectionGetOptionBytes);
- FILL_DEFAULT(driver, ConnectionGetOptionInt);
FILL_DEFAULT(driver, ConnectionGetOptionDouble);
+ FILL_DEFAULT(driver, ConnectionGetOptionInt);
FILL_DEFAULT(driver, ConnectionSetOptionBytes);
- FILL_DEFAULT(driver, ConnectionSetOptionInt);
FILL_DEFAULT(driver, ConnectionSetOptionDouble);
+ FILL_DEFAULT(driver, ConnectionSetOptionInt);
FILL_DEFAULT(driver, StatementCancel);
FILL_DEFAULT(driver, StatementExecuteSchema);
FILL_DEFAULT(driver, StatementGetOption);
FILL_DEFAULT(driver, StatementGetOptionBytes);
- FILL_DEFAULT(driver, StatementGetOptionInt);
FILL_DEFAULT(driver, StatementGetOptionDouble);
+ FILL_DEFAULT(driver, StatementGetOptionInt);
FILL_DEFAULT(driver, StatementSetOptionBytes);
- FILL_DEFAULT(driver, StatementSetOptionInt);
FILL_DEFAULT(driver, StatementSetOptionDouble);
+ FILL_DEFAULT(driver, StatementSetOptionInt);
}
return ADBC_STATUS_OK;
diff --git a/c/driver_manager/adbc_driver_manager_test.cc b/c/driver_manager/adbc_driver_manager_test.cc
index 4475bd1964..6b5b1c3026 100644
--- a/c/driver_manager/adbc_driver_manager_test.cc
+++ b/c/driver_manager/adbc_driver_manager_test.cc
@@ -40,7 +40,7 @@ class DriverManager : public ::testing::Test {
std::memset(&driver, 0, sizeof(driver));
std::memset(&error, 0, sizeof(error));
- ASSERT_THAT(AdbcLoadDriver("adbc_driver_sqlite", nullptr, ADBC_VERSION_1_0_0, &driver,
+ ASSERT_THAT(AdbcLoadDriver("adbc_driver_sqlite", nullptr, ADBC_VERSION_1_1_0, &driver,
&error),
IsOkStatus(&error));
}
@@ -191,7 +191,27 @@ class SqliteQuirks : public adbc_validation::DriverQuirks {
}
}
+ bool supports_bulk_ingest(const char* mode) const override {
+ return std::strcmp(mode, ADBC_INGEST_OPTION_MODE_APPEND) == 0 ||
+ std::strcmp(mode, ADBC_INGEST_OPTION_MODE_CREATE) == 0;
+ }
bool supports_concurrent_statements() const override { return true; }
+ bool supports_get_option() const override { return false; }
+ std::optional supports_get_sql_info(
+ uint32_t info_code) const override {
+ switch (info_code) {
+ case ADBC_INFO_DRIVER_NAME:
+ return "ADBC SQLite Driver";
+ case ADBC_INFO_DRIVER_VERSION:
+ return "(unknown)";
+ case ADBC_INFO_VENDOR_NAME:
+ return "SQLite";
+ case ADBC_INFO_VENDOR_VERSION:
+ return "3.";
+ default:
+ return std::nullopt;
+ }
+ }
};
class SqliteDatabaseTest : public ::testing::Test, public adbc_validation::DatabaseTest {
diff --git a/c/integration/duckdb/duckdb_test.cc b/c/integration/duckdb/duckdb_test.cc
index fd6e1984e6..be163418d2 100644
--- a/c/integration/duckdb/duckdb_test.cc
+++ b/c/integration/duckdb/duckdb_test.cc
@@ -46,7 +46,7 @@ class DuckDbQuirks : public adbc_validation::DriverQuirks {
std::string BindParameter(int index) const override { return "?"; }
- bool supports_bulk_ingest() const override { return false; }
+ bool supports_bulk_ingest(const char* /*mode*/) const override { return false; }
bool supports_concurrent_statements() const override { return true; }
bool supports_dynamic_parameter_binding() const override { return false; }
bool supports_get_sql_info() const override { return false; }
@@ -96,6 +96,8 @@ class DuckDbStatementTest : public ::testing::Test,
void TestSqlIngestTableEscaping() { GTEST_SKIP() << "Table escaping not implemented"; }
+ void TestSqlQueryErrors() { GTEST_SKIP() << "DuckDB does not set AdbcError.release"; }
+
protected:
DuckDbQuirks quirks_;
};
diff --git a/c/validation/CMakeLists.txt b/c/validation/CMakeLists.txt
index 2f6549b5e7..c76c63d9df 100644
--- a/c/validation/CMakeLists.txt
+++ b/c/validation/CMakeLists.txt
@@ -15,11 +15,24 @@
# specific language governing permissions and limitations
# under the License.
-add_library(adbc_validation OBJECT adbc_validation.cc adbc_validation_util.cc)
+add_library(adbc_validation_util STATIC adbc_validation_util.cc)
+adbc_configure_target(adbc_validation_util)
+target_compile_features(adbc_validation_util PRIVATE cxx_std_17)
+target_include_directories(adbc_validation_util SYSTEM
+ PRIVATE "${REPOSITORY_ROOT}" "${REPOSITORY_ROOT}/c/driver/"
+ "${REPOSITORY_ROOT}/c/vendor/")
+target_link_libraries(adbc_validation_util PUBLIC adbc_driver_common nanoarrow
+ GTest::gtest GTest::gmock)
+
+add_library(adbc_validation STATIC adbc_validation.cc)
adbc_configure_target(adbc_validation)
target_compile_features(adbc_validation PRIVATE cxx_std_17)
target_include_directories(adbc_validation SYSTEM
PRIVATE "${REPOSITORY_ROOT}" "${REPOSITORY_ROOT}/c/driver/"
"${REPOSITORY_ROOT}/c/vendor/")
-target_link_libraries(adbc_validation PUBLIC adbc_driver_common nanoarrow GTest::gtest
- GTest::gmock)
+target_link_libraries(adbc_validation
+ PUBLIC adbc_driver_common
+ adbc_validation_util
+ nanoarrow
+ GTest::gtest
+ GTest::gmock)
diff --git a/c/validation/adbc_validation.cc b/c/validation/adbc_validation.cc
index 094e53cf2c..46c3daf187 100644
--- a/c/validation/adbc_validation.cc
+++ b/c/validation/adbc_validation.cc
@@ -34,6 +34,7 @@
#include
#include
#include
+#include
#include "adbc_validation_util.h"
@@ -102,7 +103,7 @@ AdbcStatusCode DriverQuirks::EnsureSampleTable(struct AdbcConnection* connection
AdbcStatusCode DriverQuirks::CreateSampleTable(struct AdbcConnection* connection,
const std::string& name,
struct AdbcError* error) const {
- if (!supports_bulk_ingest()) {
+ if (!supports_bulk_ingest(ADBC_INGEST_OPTION_MODE_CREATE)) {
return ADBC_STATUS_NOT_IMPLEMENTED;
}
return DoIngestSampleTable(connection, name, error);
@@ -248,6 +249,56 @@ void ConnectionTest::TestAutocommitToggle() {
//------------------------------------------------------------
// Tests of metadata
+std::optional ConnectionGetOption(struct AdbcConnection* connection,
+ std::string_view option,
+ struct AdbcError* error) {
+ char buffer[128];
+ size_t buffer_size = sizeof(buffer);
+ AdbcStatusCode status =
+ AdbcConnectionGetOption(connection, option.data(), buffer, &buffer_size, error);
+ EXPECT_THAT(status, IsOkStatus(error));
+ if (status != ADBC_STATUS_OK) return std::nullopt;
+ EXPECT_GT(buffer_size, 0);
+ if (buffer_size == 0) return std::nullopt;
+ return std::string(buffer, buffer_size - 1);
+}
+
+void ConnectionTest::TestMetadataCurrentCatalog() {
+ ASSERT_THAT(AdbcConnectionNew(&connection, &error), IsOkStatus(&error));
+ ASSERT_THAT(AdbcConnectionInit(&connection, &database, &error), IsOkStatus(&error));
+
+ if (quirks()->supports_metadata_current_catalog()) {
+ ASSERT_THAT(
+ ConnectionGetOption(&connection, ADBC_CONNECTION_OPTION_CURRENT_CATALOG, &error),
+ ::testing::Optional(quirks()->catalog()));
+ } else {
+ char buffer[128];
+ size_t buffer_size = sizeof(buffer);
+ ASSERT_THAT(
+ AdbcConnectionGetOption(&connection, ADBC_CONNECTION_OPTION_CURRENT_CATALOG,
+ buffer, &buffer_size, &error),
+ IsStatus(ADBC_STATUS_NOT_FOUND));
+ }
+}
+
+void ConnectionTest::TestMetadataCurrentDbSchema() {
+ ASSERT_THAT(AdbcConnectionNew(&connection, &error), IsOkStatus(&error));
+ ASSERT_THAT(AdbcConnectionInit(&connection, &database, &error), IsOkStatus(&error));
+
+ if (quirks()->supports_metadata_current_db_schema()) {
+ ASSERT_THAT(ConnectionGetOption(&connection, ADBC_CONNECTION_OPTION_CURRENT_DB_SCHEMA,
+ &error),
+ ::testing::Optional(quirks()->db_schema()));
+ } else {
+ char buffer[128];
+ size_t buffer_size = sizeof(buffer);
+ ASSERT_THAT(
+ AdbcConnectionGetOption(&connection, ADBC_CONNECTION_OPTION_CURRENT_DB_SCHEMA,
+ buffer, &buffer_size, &error),
+ IsStatus(ADBC_STATUS_NOT_FOUND));
+ }
+}
+
void ConnectionTest::TestMetadataGetInfo() {
ASSERT_THAT(AdbcConnectionNew(&connection, &error), IsOkStatus(&error));
ASSERT_THAT(AdbcConnectionInit(&connection, &database, &error), IsOkStatus(&error));
@@ -263,6 +314,11 @@ void ConnectionTest::TestMetadataGetInfo() {
ADBC_INFO_VENDOR_NAME,
ADBC_INFO_VENDOR_VERSION,
}) {
+ SCOPED_TRACE("info_code = " + std::to_string(info_code));
+ std::optional expected = quirks()->supports_get_sql_info(info_code);
+
+ if (!expected.has_value()) continue;
+
uint32_t info[] = {info_code};
StreamReader reader;
@@ -316,8 +372,10 @@ void ConnectionTest::TestMetadataGetInfo() {
const uint32_t code =
reader.array_view->children[0]->buffer_views[1].data.as_uint32[row];
seen.push_back(code);
+ if (code != info_code) {
+ continue;
+ }
- std::optional expected = quirks()->supports_get_sql_info(code);
ASSERT_TRUE(expected.has_value()) << "Got unexpected info code " << code;
uint8_t type_code =
@@ -329,16 +387,16 @@ void ConnectionTest::TestMetadataGetInfo() {
using T = std::decay_t;
if constexpr (std::is_same_v) {
ASSERT_EQ(uint8_t(2), type_code);
- ASSERT_EQ(expected_value,
+ EXPECT_EQ(expected_value,
ArrowArrayViewGetIntUnsafe(
reader.array_view->children[1]->children[2], offset));
} else if constexpr (std::is_same_v) {
ASSERT_EQ(uint8_t(0), type_code);
struct ArrowStringView view = ArrowArrayViewGetStringUnsafe(
reader.array_view->children[1]->children[0], offset);
- ASSERT_EQ(expected_value,
- std::string_view(static_cast(view.data),
- view.size_bytes));
+ EXPECT_THAT(std::string_view(static_cast(view.data),
+ view.size_bytes),
+ ::testing::HasSubstr(expected_value));
} else {
static_assert(!sizeof(T), "not yet implemented");
}
@@ -347,12 +405,12 @@ void ConnectionTest::TestMetadataGetInfo() {
<< "code: " << type_code;
}
}
- ASSERT_THAT(seen, ::testing::IsSupersetOf(info));
+ EXPECT_THAT(seen, ::testing::IsSupersetOf(info));
}
}
void ConnectionTest::TestMetadataGetTableSchema() {
- if (!quirks()->supports_bulk_ingest()) {
+ if (!quirks()->supports_bulk_ingest(ADBC_INGEST_OPTION_MODE_CREATE)) {
GTEST_SKIP();
}
ASSERT_THAT(AdbcConnectionNew(&connection, &error), IsOkStatus(&error));
@@ -932,6 +990,33 @@ void ConnectionTest::TestMetadataGetObjectsPrimaryKey() {
ASSERT_EQ(constraint_column_name, "id");
}
+void ConnectionTest::TestMetadataGetObjectsCancel() {
+ if (!quirks()->supports_cancel() || !quirks()->supports_get_objects()) {
+ GTEST_SKIP();
+ }
+
+ ASSERT_THAT(AdbcConnectionNew(&connection, &error), IsOkStatus(&error));
+ ASSERT_THAT(AdbcConnectionInit(&connection, &database, &error), IsOkStatus(&error));
+
+ StreamReader reader;
+ ASSERT_THAT(
+ AdbcConnectionGetObjects(&connection, ADBC_OBJECT_DEPTH_CATALOGS, nullptr, nullptr,
+ nullptr, nullptr, nullptr, &reader.stream.value, &error),
+ IsOkStatus(&error));
+ ASSERT_NO_FATAL_FAILURE(reader.GetSchema());
+
+ ASSERT_THAT(AdbcConnectionCancel(&connection, &error), IsOkStatus(&error));
+
+ while (true) {
+ int err = reader.MaybeNext();
+ if (err != 0) {
+ ASSERT_THAT(err, ::testing::AnyOf(0, IsErrno(ECANCELED, &reader.stream.value,
+ /*ArrowError*/ nullptr)));
+ }
+ if (!reader.array->release) break;
+ }
+}
+
//------------------------------------------------------------
// Tests of AdbcStatement
@@ -986,7 +1071,7 @@ void StatementTest::TestRelease() {
template
void StatementTest::TestSqlIngestType(ArrowType type,
const std::vector>& values) {
- if (!quirks()->supports_bulk_ingest()) {
+ if (!quirks()->supports_bulk_ingest(ADBC_INGEST_OPTION_MODE_CREATE)) {
GTEST_SKIP();
}
@@ -1118,7 +1203,7 @@ void StatementTest::TestSqlIngestBinary() {
template
void StatementTest::TestSqlIngestTemporalType(const char* timezone) {
- if (!quirks()->supports_bulk_ingest()) {
+ if (!quirks()->supports_bulk_ingest(ADBC_INGEST_OPTION_MODE_CREATE)) {
GTEST_SKIP();
}
@@ -1240,7 +1325,8 @@ void StatementTest::TestSqlIngestTableEscaping() {
}
void StatementTest::TestSqlIngestAppend() {
- if (!quirks()->supports_bulk_ingest()) {
+ if (!quirks()->supports_bulk_ingest(ADBC_INGEST_OPTION_MODE_CREATE) ||
+ !quirks()->supports_bulk_ingest(ADBC_INGEST_OPTION_MODE_APPEND)) {
GTEST_SKIP();
}
@@ -1318,8 +1404,185 @@ void StatementTest::TestSqlIngestAppend() {
ASSERT_THAT(AdbcStatementRelease(&statement, &error), IsOkStatus(&error));
}
+void StatementTest::TestSqlIngestReplace() {
+ if (!quirks()->supports_bulk_ingest(ADBC_INGEST_OPTION_MODE_REPLACE)) {
+ GTEST_SKIP();
+ }
+
+ // Ingest
+
+ Handle schema;
+ Handle array;
+ struct ArrowError na_error;
+ ASSERT_THAT(MakeSchema(&schema.value, {{"int64s", NANOARROW_TYPE_INT64}}), IsOkErrno());
+ ASSERT_THAT(MakeBatch(&schema.value, &array.value, &na_error, {42}),
+ IsOkErrno());
+
+ ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_TARGET_TABLE,
+ "bulk_ingest", &error),
+ IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_MODE,
+ ADBC_INGEST_OPTION_MODE_REPLACE, &error),
+ IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementBind(&statement, &array.value, &schema.value, &error),
+ IsOkStatus(&error));
+
+ int64_t rows_affected = 0;
+ ASSERT_THAT(AdbcStatementExecuteQuery(&statement, nullptr, &rows_affected, &error),
+ IsOkStatus(&error));
+ ASSERT_THAT(rows_affected, ::testing::AnyOf(::testing::Eq(1), ::testing::Eq(-1)));
+
+ // Read data back
+ ASSERT_THAT(AdbcStatementSetSqlQuery(&statement, "SELECT * FROM bulk_ingest", &error),
+ IsOkStatus(&error));
+ {
+ StreamReader reader;
+ ASSERT_THAT(AdbcStatementExecuteQuery(&statement, &reader.stream.value,
+ &reader.rows_affected, &error),
+ IsOkStatus(&error));
+ ASSERT_THAT(reader.rows_affected,
+ ::testing::AnyOf(::testing::Eq(1), ::testing::Eq(-1)));
+
+ ASSERT_NO_FATAL_FAILURE(reader.GetSchema());
+ ASSERT_NO_FATAL_FAILURE(CompareSchema(&reader.schema.value,
+ {{"int64s", NANOARROW_TYPE_INT64, NULLABLE}}));
+
+ ASSERT_NO_FATAL_FAILURE(reader.Next());
+ ASSERT_NE(nullptr, reader.array->release);
+ ASSERT_EQ(1, reader.array->length);
+ ASSERT_EQ(1, reader.array->n_children);
+
+ ASSERT_NO_FATAL_FAILURE(CompareArray(reader.array_view->children[0], {42}));
+
+ ASSERT_NO_FATAL_FAILURE(reader.Next());
+ ASSERT_EQ(nullptr, reader.array->release);
+ }
+
+ // Replace
+ // Re-initialize since Bind() should take ownership of data
+ ASSERT_THAT(MakeSchema(&schema.value, {{"int64s", NANOARROW_TYPE_INT64}}), IsOkErrno());
+ ASSERT_THAT(MakeBatch(&schema.value, &array.value, &na_error, {-42, -42}),
+ IsOkErrno());
+
+ ASSERT_THAT(AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_TARGET_TABLE,
+ "bulk_ingest", &error),
+ IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_MODE,
+ ADBC_INGEST_OPTION_MODE_REPLACE, &error),
+ IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementBind(&statement, &array.value, &schema.value, &error),
+ IsOkStatus(&error));
+
+ ASSERT_THAT(AdbcStatementExecuteQuery(&statement, nullptr, &rows_affected, &error),
+ IsOkStatus(&error));
+ ASSERT_THAT(rows_affected, ::testing::AnyOf(::testing::Eq(2), ::testing::Eq(-1)));
+
+ // Read data back
+ ASSERT_THAT(AdbcStatementSetSqlQuery(&statement, "SELECT * FROM bulk_ingest", &error),
+ IsOkStatus(&error));
+ {
+ StreamReader reader;
+ ASSERT_THAT(AdbcStatementExecuteQuery(&statement, &reader.stream.value,
+ &reader.rows_affected, &error),
+ IsOkStatus(&error));
+ ASSERT_THAT(reader.rows_affected,
+ ::testing::AnyOf(::testing::Eq(2), ::testing::Eq(-1)));
+
+ ASSERT_NO_FATAL_FAILURE(reader.GetSchema());
+ ASSERT_NO_FATAL_FAILURE(CompareSchema(&reader.schema.value,
+ {{"int64s", NANOARROW_TYPE_INT64, NULLABLE}}));
+
+ ASSERT_NO_FATAL_FAILURE(reader.Next());
+ ASSERT_NE(nullptr, reader.array->release);
+ ASSERT_EQ(2, reader.array->length);
+ ASSERT_EQ(1, reader.array->n_children);
+
+ ASSERT_NO_FATAL_FAILURE(
+ CompareArray(reader.array_view->children[0], {-42, -42}));
+
+ ASSERT_NO_FATAL_FAILURE(reader.Next());
+ ASSERT_EQ(nullptr, reader.array->release);
+ }
+}
+
+void StatementTest::TestSqlIngestCreateAppend() {
+ if (!quirks()->supports_bulk_ingest(ADBC_INGEST_OPTION_MODE_CREATE_APPEND)) {
+ GTEST_SKIP();
+ }
+
+ ASSERT_THAT(quirks()->DropTable(&connection, "bulk_ingest", &error),
+ IsOkStatus(&error));
+
+ // Ingest
+
+ Handle schema;
+ Handle array;
+ struct ArrowError na_error;
+ ASSERT_THAT(MakeSchema(&schema.value, {{"int64s", NANOARROW_TYPE_INT64}}), IsOkErrno());
+ ASSERT_THAT(MakeBatch(&schema.value, &array.value, &na_error, {42}),
+ IsOkErrno());
+
+ ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_TARGET_TABLE,
+ "bulk_ingest", &error),
+ IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_MODE,
+ ADBC_INGEST_OPTION_MODE_CREATE_APPEND, &error),
+ IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementBind(&statement, &array.value, &schema.value, &error),
+ IsOkStatus(&error));
+
+ int64_t rows_affected = 0;
+ ASSERT_THAT(AdbcStatementExecuteQuery(&statement, nullptr, &rows_affected, &error),
+ IsOkStatus(&error));
+ ASSERT_THAT(rows_affected, ::testing::AnyOf(::testing::Eq(1), ::testing::Eq(-1)));
+
+ // Append
+ // Re-initialize since Bind() should take ownership of data
+ ASSERT_THAT(MakeSchema(&schema.value, {{"int64s", NANOARROW_TYPE_INT64}}), IsOkErrno());
+ ASSERT_THAT(MakeBatch(&schema.value, &array.value, &na_error, {42, 42}),
+ IsOkErrno());
+
+ ASSERT_THAT(AdbcStatementBind(&statement, &array.value, &schema.value, &error),
+ IsOkStatus(&error));
+
+ ASSERT_THAT(AdbcStatementExecuteQuery(&statement, nullptr, &rows_affected, &error),
+ IsOkStatus(&error));
+ ASSERT_THAT(rows_affected, ::testing::AnyOf(::testing::Eq(2), ::testing::Eq(-1)));
+
+ // Read data back
+ ASSERT_THAT(AdbcStatementSetSqlQuery(&statement, "SELECT * FROM bulk_ingest", &error),
+ IsOkStatus(&error));
+ {
+ StreamReader reader;
+ ASSERT_THAT(AdbcStatementExecuteQuery(&statement, &reader.stream.value,
+ &reader.rows_affected, &error),
+ IsOkStatus(&error));
+ ASSERT_THAT(reader.rows_affected,
+ ::testing::AnyOf(::testing::Eq(3), ::testing::Eq(-1)));
+
+ ASSERT_NO_FATAL_FAILURE(reader.GetSchema());
+ ASSERT_NO_FATAL_FAILURE(CompareSchema(&reader.schema.value,
+ {{"int64s", NANOARROW_TYPE_INT64, NULLABLE}}));
+
+ ASSERT_NO_FATAL_FAILURE(reader.Next());
+ ASSERT_NE(nullptr, reader.array->release);
+ ASSERT_EQ(3, reader.array->length);
+ ASSERT_EQ(1, reader.array->n_children);
+
+ ASSERT_NO_FATAL_FAILURE(
+ CompareArray(reader.array_view->children[0], {42, 42, 42}));
+
+ ASSERT_NO_FATAL_FAILURE(reader.Next());
+ ASSERT_EQ(nullptr, reader.array->release);
+ }
+
+ ASSERT_THAT(AdbcStatementRelease(&statement, &error), IsOkStatus(&error));
+}
+
void StatementTest::TestSqlIngestErrors() {
- if (!quirks()->supports_bulk_ingest()) {
+ if (!quirks()->supports_bulk_ingest(ADBC_INGEST_OPTION_MODE_CREATE)) {
GTEST_SKIP();
}
@@ -1399,7 +1662,7 @@ void StatementTest::TestSqlIngestErrors() {
}
void StatementTest::TestSqlIngestMultipleConnections() {
- if (!quirks()->supports_bulk_ingest()) {
+ if (!quirks()->supports_bulk_ingest(ADBC_INGEST_OPTION_MODE_CREATE)) {
GTEST_SKIP();
}
@@ -1469,7 +1732,7 @@ void StatementTest::TestSqlIngestMultipleConnections() {
}
void StatementTest::TestSqlIngestSample() {
- if (!quirks()->supports_bulk_ingest()) {
+ if (!quirks()->supports_bulk_ingest(ADBC_INGEST_OPTION_MODE_CREATE)) {
GTEST_SKIP();
}
@@ -1715,7 +1978,7 @@ void StatementTest::TestSqlPrepareSelectParams() {
}
void StatementTest::TestSqlPrepareUpdate() {
- if (!quirks()->supports_bulk_ingest() ||
+ if (!quirks()->supports_bulk_ingest(ADBC_INGEST_OPTION_MODE_CREATE) ||
!quirks()->supports_dynamic_parameter_binding()) {
GTEST_SKIP();
}
@@ -1794,7 +2057,7 @@ void StatementTest::TestSqlPrepareUpdateNoParams() {
}
void StatementTest::TestSqlPrepareUpdateStream() {
- if (!quirks()->supports_bulk_ingest() ||
+ if (!quirks()->supports_bulk_ingest(ADBC_INGEST_OPTION_MODE_CREATE) ||
!quirks()->supports_dynamic_parameter_binding()) {
GTEST_SKIP();
}
@@ -2069,6 +2332,36 @@ void StatementTest::TestSqlQueryStrings() {
ASSERT_THAT(AdbcStatementRelease(&statement, &error), IsOkStatus(&error));
}
+void StatementTest::TestSqlQueryCancel() {
+ if (!quirks()->supports_cancel()) {
+ GTEST_SKIP();
+ }
+
+ ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementSetSqlQuery(&statement, "SELECT 'SaShiSuSeSo'", &error),
+ IsOkStatus(&error));
+
+ {
+ StreamReader reader;
+ ASSERT_THAT(AdbcStatementExecuteQuery(&statement, &reader.stream.value,
+ &reader.rows_affected, &error),
+ IsOkStatus(&error));
+ ASSERT_NO_FATAL_FAILURE(reader.GetSchema());
+
+ ASSERT_THAT(AdbcStatementCancel(&statement, &error), IsOkStatus(&error));
+ while (true) {
+ int err = reader.MaybeNext();
+ if (err != 0) {
+ ASSERT_THAT(err, ::testing::AnyOf(0, IsErrno(ECANCELED, &reader.stream.value,
+ /*ArrowError*/ nullptr)));
+ }
+ if (!reader.array->release) break;
+ }
+ }
+
+ ASSERT_THAT(AdbcStatementRelease(&statement, &error), IsOkStatus(&error));
+}
+
void StatementTest::TestSqlQueryErrors() {
// Invalid query
ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error));
@@ -2088,6 +2381,13 @@ void StatementTest::TestTransactions() {
ASSERT_THAT(quirks()->DropTable(&connection, "bulk_ingest", &error),
IsOkStatus(&error));
+ if (quirks()->supports_get_option()) {
+ auto autocommit =
+ ConnectionGetOption(&connection, ADBC_CONNECTION_OPTION_AUTOCOMMIT, &error);
+ ASSERT_THAT(autocommit,
+ ::testing::Optional(::testing::StrEq(ADBC_OPTION_VALUE_ENABLED)));
+ }
+
Handle connection2;
ASSERT_THAT(AdbcConnectionNew(&connection2.value, &error), IsOkStatus(&error));
ASSERT_THAT(AdbcConnectionInit(&connection2.value, &database, &error),
@@ -2097,6 +2397,13 @@ void StatementTest::TestTransactions() {
ADBC_OPTION_VALUE_DISABLED, &error),
IsOkStatus(&error));
+ if (quirks()->supports_get_option()) {
+ auto autocommit =
+ ConnectionGetOption(&connection, ADBC_CONNECTION_OPTION_AUTOCOMMIT, &error);
+ ASSERT_THAT(autocommit,
+ ::testing::Optional(::testing::StrEq(ADBC_OPTION_VALUE_DISABLED)));
+ }
+
// Uncommitted change
ASSERT_NO_FATAL_FAILURE(IngestSampleTable(&connection, &error));
@@ -2172,6 +2479,86 @@ void StatementTest::TestTransactions() {
}
}
+void StatementTest::TestSqlSchemaInts() {
+ if (!quirks()->supports_execute_schema()) {
+ GTEST_SKIP() << "Not supported";
+ }
+
+ ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementSetSqlQuery(&statement, "SELECT 42", &error),
+ IsOkStatus(&error));
+
+ nanoarrow::UniqueSchema schema;
+ ASSERT_THAT(AdbcStatementExecuteSchema(&statement, schema.get(), &error),
+ IsOkStatus(&error));
+
+ ASSERT_EQ(1, schema->n_children);
+ ASSERT_THAT(schema->children[0]->format, ::testing::AnyOfArray({
+ ::testing::StrEq("i"), // int32
+ ::testing::StrEq("l"), // int64
+ }));
+
+ ASSERT_THAT(AdbcStatementRelease(&statement, &error), IsOkStatus(&error));
+}
+
+void StatementTest::TestSqlSchemaFloats() {
+ if (!quirks()->supports_execute_schema()) {
+ GTEST_SKIP() << "Not supported";
+ }
+
+ ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementSetSqlQuery(&statement, "SELECT CAST(1.5 AS FLOAT)", &error),
+ IsOkStatus(&error));
+
+ nanoarrow::UniqueSchema schema;
+ ASSERT_THAT(AdbcStatementExecuteSchema(&statement, schema.get(), &error),
+ IsOkStatus(&error));
+
+ ASSERT_EQ(1, schema->n_children);
+ ASSERT_THAT(schema->children[0]->format, ::testing::AnyOfArray({
+ ::testing::StrEq("f"), // float32
+ ::testing::StrEq("g"), // float64
+ }));
+
+ ASSERT_THAT(AdbcStatementRelease(&statement, &error), IsOkStatus(&error));
+}
+
+void StatementTest::TestSqlSchemaStrings() {
+ if (!quirks()->supports_execute_schema()) {
+ GTEST_SKIP() << "Not supported";
+ }
+
+ ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementSetSqlQuery(&statement, "SELECT 'hi'", &error),
+ IsOkStatus(&error));
+
+ nanoarrow::UniqueSchema schema;
+ ASSERT_THAT(AdbcStatementExecuteSchema(&statement, schema.get(), &error),
+ IsOkStatus(&error));
+
+ ASSERT_EQ(1, schema->n_children);
+ ASSERT_THAT(schema->children[0]->format, ::testing::AnyOfArray({
+ ::testing::StrEq("u"), // string
+ ::testing::StrEq("U"), // large_string
+ }));
+
+ ASSERT_THAT(AdbcStatementRelease(&statement, &error), IsOkStatus(&error));
+}
+
+void StatementTest::TestSqlSchemaErrors() {
+ if (!quirks()->supports_execute_schema()) {
+ GTEST_SKIP() << "Not supported";
+ }
+
+ ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error));
+
+ nanoarrow::UniqueSchema schema;
+ ASSERT_THAT(AdbcStatementExecuteSchema(&statement, schema.get(), &error),
+ IsStatus(ADBC_STATUS_INVALID_STATE, &error));
+
+ ASSERT_THAT(AdbcStatementRelease(&statement, &error), IsOkStatus(&error));
+}
+
void StatementTest::TestConcurrentStatements() {
Handle statement1;
Handle statement2;
diff --git a/c/validation/adbc_validation.h b/c/validation/adbc_validation.h
index 54622ba5ca..5e560d45f5 100644
--- a/c/validation/adbc_validation.h
+++ b/c/validation/adbc_validation.h
@@ -88,10 +88,22 @@ class DriverQuirks {
return ingest_type;
}
+ /// \brief Whether bulk ingest is supported
+ virtual bool supports_bulk_ingest(const char* mode) const { return true; }
+
+ /// \brief Whether we can cancel queries.
+ virtual bool supports_cancel() const { return false; }
+
/// \brief Whether two statements can be used at the same time on a
/// single connection
virtual bool supports_concurrent_statements() const { return false; }
+ /// \brief Whether AdbcStatementExecuteSchema should work
+ virtual bool supports_execute_schema() const { return false; }
+
+ /// \brief Whether GetOption* should work
+ virtual bool supports_get_option() const { return true; }
+
/// \brief Whether AdbcStatementExecutePartitions should work
virtual bool supports_partitioned_data() const { return false; }
@@ -112,8 +124,11 @@ class DriverQuirks {
/// \brief Whether GetObjects is implemented
virtual bool supports_get_objects() const { return true; }
- /// \brief Whether bulk ingest is supported
- virtual bool supports_bulk_ingest() const { return true; }
+ /// \brief Whether we can get ADBC_CONNECTION_OPTION_CURRENT_CATALOG
+ virtual bool supports_metadata_current_catalog() const { return false; }
+
+ /// \brief Whether we can get ADBC_CONNECTION_OPTION_CURRENT_DB_SCHEMA
+ virtual bool supports_metadata_current_db_schema() const { return false; }
/// \brief Whether dynamic parameter bindings are supported for prepare
virtual bool supports_dynamic_parameter_binding() const { return true; }
@@ -165,6 +180,9 @@ class ConnectionTest {
void TestAutocommitToggle();
+ void TestMetadataCurrentCatalog();
+ void TestMetadataCurrentDbSchema();
+
void TestMetadataGetInfo();
void TestMetadataGetTableSchema();
void TestMetadataGetTableTypes();
@@ -176,6 +194,7 @@ class ConnectionTest {
void TestMetadataGetObjectsColumns();
void TestMetadataGetObjectsConstraints();
void TestMetadataGetObjectsPrimaryKey();
+ void TestMetadataGetObjectsCancel();
protected:
struct AdbcError error;
@@ -183,28 +202,31 @@ class ConnectionTest {
struct AdbcConnection connection;
};
-#define ADBCV_TEST_CONNECTION(FIXTURE) \
- static_assert(std::is_base_of::value, \
- ADBCV_STRINGIFY(FIXTURE) " must inherit from ConnectionTest"); \
- TEST_F(FIXTURE, NewInit) { TestNewInit(); } \
- TEST_F(FIXTURE, Release) { TestRelease(); } \
- TEST_F(FIXTURE, Concurrent) { TestConcurrent(); } \
- TEST_F(FIXTURE, AutocommitDefault) { TestAutocommitDefault(); } \
- TEST_F(FIXTURE, AutocommitToggle) { TestAutocommitToggle(); } \
- TEST_F(FIXTURE, MetadataGetInfo) { TestMetadataGetInfo(); } \
- TEST_F(FIXTURE, MetadataGetTableSchema) { TestMetadataGetTableSchema(); } \
- TEST_F(FIXTURE, MetadataGetTableTypes) { TestMetadataGetTableTypes(); } \
- TEST_F(FIXTURE, MetadataGetObjectsCatalogs) { TestMetadataGetObjectsCatalogs(); } \
- TEST_F(FIXTURE, MetadataGetObjectsDbSchemas) { TestMetadataGetObjectsDbSchemas(); } \
- TEST_F(FIXTURE, MetadataGetObjectsTables) { TestMetadataGetObjectsTables(); } \
- TEST_F(FIXTURE, MetadataGetObjectsTablesTypes) { \
- TestMetadataGetObjectsTablesTypes(); \
- } \
- TEST_F(FIXTURE, MetadataGetObjectsColumns) { TestMetadataGetObjectsColumns(); } \
- TEST_F(FIXTURE, MetadataGetObjectsConstraints) { \
- TestMetadataGetObjectsConstraints(); \
- } \
- TEST_F(FIXTURE, MetadataGetObjectsPrimaryKey) { TestMetadataGetObjectsPrimaryKey(); }
+#define ADBCV_TEST_CONNECTION(FIXTURE) \
+ static_assert(std::is_base_of::value, \
+ ADBCV_STRINGIFY(FIXTURE) " must inherit from ConnectionTest"); \
+ TEST_F(FIXTURE, NewInit) { TestNewInit(); } \
+ TEST_F(FIXTURE, Release) { TestRelease(); } \
+ TEST_F(FIXTURE, Concurrent) { TestConcurrent(); } \
+ TEST_F(FIXTURE, AutocommitDefault) { TestAutocommitDefault(); } \
+ TEST_F(FIXTURE, AutocommitToggle) { TestAutocommitToggle(); } \
+ TEST_F(FIXTURE, MetadataCurrentCatalog) { TestMetadataCurrentCatalog(); } \
+ TEST_F(FIXTURE, MetadataCurrentDbSchema) { TestMetadataCurrentDbSchema(); } \
+ TEST_F(FIXTURE, MetadataGetInfo) { TestMetadataGetInfo(); } \
+ TEST_F(FIXTURE, MetadataGetTableSchema) { TestMetadataGetTableSchema(); } \
+ TEST_F(FIXTURE, MetadataGetTableTypes) { TestMetadataGetTableTypes(); } \
+ TEST_F(FIXTURE, MetadataGetObjectsCatalogs) { TestMetadataGetObjectsCatalogs(); } \
+ TEST_F(FIXTURE, MetadataGetObjectsDbSchemas) { TestMetadataGetObjectsDbSchemas(); } \
+ TEST_F(FIXTURE, MetadataGetObjectsTables) { TestMetadataGetObjectsTables(); } \
+ TEST_F(FIXTURE, MetadataGetObjectsTablesTypes) { \
+ TestMetadataGetObjectsTablesTypes(); \
+ } \
+ TEST_F(FIXTURE, MetadataGetObjectsColumns) { TestMetadataGetObjectsColumns(); } \
+ TEST_F(FIXTURE, MetadataGetObjectsConstraints) { \
+ TestMetadataGetObjectsConstraints(); \
+ } \
+ TEST_F(FIXTURE, MetadataGetObjectsPrimaryKey) { TestMetadataGetObjectsPrimaryKey(); } \
+ TEST_F(FIXTURE, MetadataGetObjectsCancel) { TestMetadataGetObjectsCancel(); }
class StatementTest {
public:
@@ -245,6 +267,8 @@ class StatementTest {
void TestSqlIngestTableEscaping();
void TestSqlIngestAppend();
+ void TestSqlIngestReplace();
+ void TestSqlIngestCreateAppend();
void TestSqlIngestErrors();
void TestSqlIngestMultipleConnections();
void TestSqlIngestSample();
@@ -264,8 +288,15 @@ class StatementTest {
void TestSqlQueryFloats();
void TestSqlQueryStrings();
+ void TestSqlQueryCancel();
void TestSqlQueryErrors();
+ void TestSqlSchemaInts();
+ void TestSqlSchemaFloats();
+ void TestSqlSchemaStrings();
+
+ void TestSqlSchemaErrors();
+
void TestTransactions();
void TestConcurrentStatements();
@@ -312,6 +343,8 @@ class StatementTest {
TEST_F(FIXTURE, SqlIngestTimestampTz) { TestSqlIngestTimestampTz(); } \
TEST_F(FIXTURE, SqlIngestTableEscaping) { TestSqlIngestTableEscaping(); } \
TEST_F(FIXTURE, SqlIngestAppend) { TestSqlIngestAppend(); } \
+ TEST_F(FIXTURE, SqlIngestReplace) { TestSqlIngestReplace(); } \
+ TEST_F(FIXTURE, SqlIngestCreateAppend) { TestSqlIngestCreateAppend(); } \
TEST_F(FIXTURE, SqlIngestErrors) { TestSqlIngestErrors(); } \
TEST_F(FIXTURE, SqlIngestMultipleConnections) { TestSqlIngestMultipleConnections(); } \
TEST_F(FIXTURE, SqlIngestSample) { TestSqlIngestSample(); } \
@@ -329,7 +362,12 @@ class StatementTest {
TEST_F(FIXTURE, SqlQueryInts) { TestSqlQueryInts(); } \
TEST_F(FIXTURE, SqlQueryFloats) { TestSqlQueryFloats(); } \
TEST_F(FIXTURE, SqlQueryStrings) { TestSqlQueryStrings(); } \
+ TEST_F(FIXTURE, SqlQueryCancel) { TestSqlQueryCancel(); } \
TEST_F(FIXTURE, SqlQueryErrors) { TestSqlQueryErrors(); } \
+ TEST_F(FIXTURE, SqlSchemaInts) { TestSqlSchemaInts(); } \
+ TEST_F(FIXTURE, SqlSchemaFloats) { TestSqlSchemaFloats(); } \
+ TEST_F(FIXTURE, SqlSchemaStrings) { TestSqlSchemaStrings(); } \
+ TEST_F(FIXTURE, SqlSchemaErrors) { TestSqlSchemaErrors(); } \
TEST_F(FIXTURE, Transactions) { TestTransactions(); } \
TEST_F(FIXTURE, ConcurrentStatements) { TestConcurrentStatements(); } \
TEST_F(FIXTURE, ResultInvalidation) { TestResultInvalidation(); }
diff --git a/go/adbc/drivermgr/adbc.h b/go/adbc/drivermgr/adbc.h
index badfc7d65a..4c13585c08 100644
--- a/go/adbc/drivermgr/adbc.h
+++ b/go/adbc/drivermgr/adbc.h
@@ -1292,7 +1292,8 @@ AdbcStatusCode AdbcConnectionRelease(struct AdbcConnection* connection,
/// or while consuming an ArrowArrayStream returned from such.
/// Calling this function should make the other functions return
/// ADBC_STATUS_CANCELLED (from ADBC functions) or ECANCELED (from
-/// methods of ArrowArrayStream).
+/// methods of ArrowArrayStream). (It is not guaranteed to, for
+/// instance, the result set may be buffered in memory already.)
///
/// This must always be thread-safe (other operations are not).
///
@@ -1947,7 +1948,8 @@ AdbcStatusCode AdbcStatementBindStream(struct AdbcStatement* statement,
/// or while consuming an ArrowArrayStream returned from such.
/// Calling this function should make the other functions return
/// ADBC_STATUS_CANCELLED (from ADBC functions) or ECANCELED (from
-/// methods of ArrowArrayStream).
+/// methods of ArrowArrayStream). (It is not guaranteed to, for
+/// instance, the result set may be buffered in memory already.)
///
/// This must always be thread-safe (other operations are not).
///
diff --git a/go/adbc/drivermgr/adbc_driver_manager.cc b/go/adbc/drivermgr/adbc_driver_manager.cc
index 1d35482ac9..ea4ddc4c55 100644
--- a/go/adbc/drivermgr/adbc_driver_manager.cc
+++ b/go/adbc/drivermgr/adbc_driver_manager.cc
@@ -132,23 +132,23 @@ static AdbcStatusCode ReleaseDriver(struct AdbcDriver* driver, struct AdbcError*
AdbcStatusCode DatabaseGetOption(struct AdbcDatabase* database, const char* key,
char* value, size_t* length, struct AdbcError* error) {
- return ADBC_STATUS_NOT_IMPLEMENTED;
+ return ADBC_STATUS_NOT_FOUND;
}
AdbcStatusCode DatabaseGetOptionBytes(struct AdbcDatabase* database, const char* key,
uint8_t* value, size_t* length,
struct AdbcError* error) {
- return ADBC_STATUS_NOT_IMPLEMENTED;
+ return ADBC_STATUS_NOT_FOUND;
}
AdbcStatusCode DatabaseGetOptionInt(struct AdbcDatabase* database, const char* key,
int64_t* value, struct AdbcError* error) {
- return ADBC_STATUS_NOT_IMPLEMENTED;
+ return ADBC_STATUS_NOT_FOUND;
}
AdbcStatusCode DatabaseGetOptionDouble(struct AdbcDatabase* database, const char* key,
double* value, struct AdbcError* error) {
- return ADBC_STATUS_NOT_IMPLEMENTED;
+ return ADBC_STATUS_NOT_FOUND;
}
AdbcStatusCode DatabaseSetOption(struct AdbcDatabase* database, const char* key,
@@ -195,24 +195,24 @@ AdbcStatusCode ConnectionGetObjects(struct AdbcConnection*, int, const char*, co
AdbcStatusCode ConnectionGetOption(struct AdbcConnection* connection, const char* key,
char* value, size_t* length, struct AdbcError* error) {
- return ADBC_STATUS_NOT_IMPLEMENTED;
+ return ADBC_STATUS_NOT_FOUND;
}
AdbcStatusCode ConnectionGetOptionBytes(struct AdbcConnection* connection,
const char* key, uint8_t* value, size_t* length,
struct AdbcError* error) {
- return ADBC_STATUS_NOT_IMPLEMENTED;
+ return ADBC_STATUS_NOT_FOUND;
}
AdbcStatusCode ConnectionGetOptionInt(struct AdbcConnection* connection, const char* key,
int64_t* value, struct AdbcError* error) {
- return ADBC_STATUS_NOT_IMPLEMENTED;
+ return ADBC_STATUS_NOT_FOUND;
}
AdbcStatusCode ConnectionGetOptionDouble(struct AdbcConnection* connection,
const char* key, double* value,
struct AdbcError* error) {
- return ADBC_STATUS_NOT_IMPLEMENTED;
+ return ADBC_STATUS_NOT_FOUND;
}
AdbcStatusCode ConnectionGetTableSchema(struct AdbcConnection*, const char*, const char*,
@@ -284,23 +284,23 @@ AdbcStatusCode StatementExecuteSchema(struct AdbcStatement* statement,
AdbcStatusCode StatementGetOption(struct AdbcStatement* statement, const char* key,
char* value, size_t* length, struct AdbcError* error) {
- return ADBC_STATUS_NOT_IMPLEMENTED;
+ return ADBC_STATUS_NOT_FOUND;
}
AdbcStatusCode StatementGetOptionBytes(struct AdbcStatement* statement, const char* key,
uint8_t* value, size_t* length,
struct AdbcError* error) {
- return ADBC_STATUS_NOT_IMPLEMENTED;
+ return ADBC_STATUS_NOT_FOUND;
}
AdbcStatusCode StatementGetOptionInt(struct AdbcStatement* statement, const char* key,
int64_t* value, struct AdbcError* error) {
- return ADBC_STATUS_NOT_IMPLEMENTED;
+ return ADBC_STATUS_NOT_FOUND;
}
AdbcStatusCode StatementGetOptionDouble(struct AdbcStatement* statement, const char* key,
double* value, struct AdbcError* error) {
- return ADBC_STATUS_NOT_IMPLEMENTED;
+ return ADBC_STATUS_NOT_FOUND;
}
AdbcStatusCode StatementGetParameterSchema(struct AdbcStatement* statement,
@@ -535,11 +535,11 @@ AdbcStatusCode AdbcDatabaseInit(struct AdbcDatabase* database, struct AdbcError*
// So we don't confuse a driver into thinking it's initialized already
database->private_data = nullptr;
if (args->init_func) {
- status = AdbcLoadDriverFromInitFunc(args->init_func, ADBC_VERSION_1_0_0,
+ status = AdbcLoadDriverFromInitFunc(args->init_func, ADBC_VERSION_1_1_0,
database->private_driver, error);
} else {
status = AdbcLoadDriver(args->driver.c_str(), args->entrypoint.c_str(),
- ADBC_VERSION_1_0_0, database->private_driver, error);
+ ADBC_VERSION_1_1_0, database->private_driver, error);
}
if (status != ADBC_STATUS_OK) {
// Restore private_data so it will be released by AdbcDatabaseRelease
@@ -1375,30 +1375,30 @@ AdbcStatusCode AdbcLoadDriverFromInitFunc(AdbcDriverInitFunc init_func, int vers
auto* driver = reinterpret_cast(raw_driver);
FILL_DEFAULT(driver, DatabaseGetOption);
FILL_DEFAULT(driver, DatabaseGetOptionBytes);
- FILL_DEFAULT(driver, DatabaseGetOptionInt);
FILL_DEFAULT(driver, DatabaseGetOptionDouble);
+ FILL_DEFAULT(driver, DatabaseGetOptionInt);
FILL_DEFAULT(driver, DatabaseSetOptionBytes);
- FILL_DEFAULT(driver, DatabaseSetOptionInt);
FILL_DEFAULT(driver, DatabaseSetOptionDouble);
+ FILL_DEFAULT(driver, DatabaseSetOptionInt);
FILL_DEFAULT(driver, ConnectionCancel);
FILL_DEFAULT(driver, ConnectionGetOption);
FILL_DEFAULT(driver, ConnectionGetOptionBytes);
- FILL_DEFAULT(driver, ConnectionGetOptionInt);
FILL_DEFAULT(driver, ConnectionGetOptionDouble);
+ FILL_DEFAULT(driver, ConnectionGetOptionInt);
FILL_DEFAULT(driver, ConnectionSetOptionBytes);
- FILL_DEFAULT(driver, ConnectionSetOptionInt);
FILL_DEFAULT(driver, ConnectionSetOptionDouble);
+ FILL_DEFAULT(driver, ConnectionSetOptionInt);
FILL_DEFAULT(driver, StatementCancel);
FILL_DEFAULT(driver, StatementExecuteSchema);
FILL_DEFAULT(driver, StatementGetOption);
FILL_DEFAULT(driver, StatementGetOptionBytes);
- FILL_DEFAULT(driver, StatementGetOptionInt);
FILL_DEFAULT(driver, StatementGetOptionDouble);
+ FILL_DEFAULT(driver, StatementGetOptionInt);
FILL_DEFAULT(driver, StatementSetOptionBytes);
- FILL_DEFAULT(driver, StatementSetOptionInt);
FILL_DEFAULT(driver, StatementSetOptionDouble);
+ FILL_DEFAULT(driver, StatementSetOptionInt);
}
return ADBC_STATUS_OK;