From 250e39aaf6bfed261aeb2a83660f6f9c3f42ecc1 Mon Sep 17 00:00:00 2001 From: David Li Date: Mon, 26 Jun 2023 12:22:15 -0400 Subject: [PATCH] feat(c/driver/postgresql): implement ADBC 1.1.0 features - ADBC_INFO_DRIVER_ADBC_VERSION - StatementExecuteSchema (#318) - ADBC_CONNECTION_OPTION_CURRENT_{CATALOG, DB_SCHEMA) (#319) - ConnectionCancel/StatementCancel - GetOption/SetOption - Ingest modes --- adbc.h | 6 +- c/driver/common/utils.c | 13 + c/driver/common/utils.h | 3 + c/driver/flightsql/dremio_flightsql_test.cc | 2 +- c/driver/flightsql/sqlite_flightsql_test.cc | 3 +- c/driver/postgresql/connection.cc | 111 +++++- c/driver/postgresql/connection.h | 17 +- c/driver/postgresql/database.h | 12 + c/driver/postgresql/postgres_copy_reader.h | 8 +- c/driver/postgresql/postgresql.cc | 398 ++++++++++++++++++- c/driver/postgresql/postgresql_test.cc | 120 ++++-- c/driver/postgresql/statement.cc | 241 +++++++++--- c/driver/postgresql/statement.h | 22 +- c/driver/snowflake/snowflake_test.cc | 2 +- c/driver/sqlite/sqlite.c | 291 +++++++++++++- c/driver/sqlite/sqlite_test.cc | 20 + c/integration/duckdb/duckdb_test.cc | 2 +- c/validation/adbc_validation.cc | 416 +++++++++++++++++++- c/validation/adbc_validation.h | 86 ++-- go/adbc/drivermgr/adbc.h | 6 +- 20 files changed, 1614 insertions(+), 165 deletions(-) diff --git a/adbc.h b/adbc.h index badfc7d65a..4c13585c08 100644 --- a/adbc.h +++ b/adbc.h @@ -1292,7 +1292,8 @@ AdbcStatusCode AdbcConnectionRelease(struct AdbcConnection* connection, /// or while consuming an ArrowArrayStream returned from such. /// Calling this function should make the other functions return /// ADBC_STATUS_CANCELLED (from ADBC functions) or ECANCELED (from -/// methods of ArrowArrayStream). +/// methods of ArrowArrayStream). (It is not guaranteed to, for +/// instance, the result set may be buffered in memory already.) /// /// This must always be thread-safe (other operations are not). /// @@ -1947,7 +1948,8 @@ AdbcStatusCode AdbcStatementBindStream(struct AdbcStatement* statement, /// or while consuming an ArrowArrayStream returned from such. /// Calling this function should make the other functions return /// ADBC_STATUS_CANCELLED (from ADBC functions) or ECANCELED (from -/// methods of ArrowArrayStream). +/// methods of ArrowArrayStream). (It is not guaranteed to, for +/// instance, the result set may be buffered in memory already.) /// /// This must always be thread-safe (other operations are not). /// diff --git a/c/driver/common/utils.c b/c/driver/common/utils.c index dfac14f5e4..eb01bc18a4 100644 --- a/c/driver/common/utils.c +++ b/c/driver/common/utils.c @@ -244,6 +244,19 @@ AdbcStatusCode AdbcConnectionGetInfoAppendString(struct ArrowArray* array, return ADBC_STATUS_OK; } +AdbcStatusCode AdbcConnectionGetInfoAppendInt(struct ArrowArray* array, + uint32_t info_code, int64_t info_value, + struct AdbcError* error) { + CHECK_NA(INTERNAL, ArrowArrayAppendUInt(array->children[0], info_code), error); + // Append to type variant + CHECK_NA(INTERNAL, ArrowArrayAppendInt(array->children[1]->children[2], info_value), + error); + // Append type code/offset + CHECK_NA(INTERNAL, ArrowArrayFinishUnionElement(array->children[1], /*type_id=*/2), + error); + return ADBC_STATUS_OK; +} + AdbcStatusCode AdbcInitConnectionObjectsSchema(struct ArrowSchema* schema, struct AdbcError* error) { ArrowSchemaInit(schema); diff --git a/c/driver/common/utils.h b/c/driver/common/utils.h index 5735bb945f..381c7b05ee 100644 --- a/c/driver/common/utils.h +++ b/c/driver/common/utils.h @@ -117,6 +117,9 @@ AdbcStatusCode AdbcConnectionGetInfoAppendString(struct ArrowArray* array, uint32_t info_code, const char* info_value, struct AdbcError* error); +AdbcStatusCode AdbcConnectionGetInfoAppendInt(struct ArrowArray* array, + uint32_t info_code, int64_t info_value, + struct AdbcError* error); AdbcStatusCode AdbcInitConnectionObjectsSchema(struct ArrowSchema* schema, struct AdbcError* error); diff --git a/c/driver/flightsql/dremio_flightsql_test.cc b/c/driver/flightsql/dremio_flightsql_test.cc index c128bd49f3..1f38bc7654 100644 --- a/c/driver/flightsql/dremio_flightsql_test.cc +++ b/c/driver/flightsql/dremio_flightsql_test.cc @@ -42,11 +42,11 @@ class DremioFlightSqlQuirks : public adbc_validation::DriverQuirks { } std::string BindParameter(int index) const override { return "?"; } + bool supports_bulk_ingest(const char* /*mode*/) const override { return false; } bool supports_concurrent_statements() const override { return true; } bool supports_transactions() const override { return false; } bool supports_get_sql_info() const override { return false; } bool supports_get_objects() const override { return true; } - bool supports_bulk_ingest() const override { return false; } bool supports_partitioned_data() const override { return true; } bool supports_dynamic_parameter_binding() const override { return false; } }; diff --git a/c/driver/flightsql/sqlite_flightsql_test.cc b/c/driver/flightsql/sqlite_flightsql_test.cc index 043261971a..7ee45a250f 100644 --- a/c/driver/flightsql/sqlite_flightsql_test.cc +++ b/c/driver/flightsql/sqlite_flightsql_test.cc @@ -90,6 +90,8 @@ class SqliteFlightSqlQuirks : public adbc_validation::DriverQuirks { } std::string BindParameter(int index) const override { return "?"; } + + bool supports_bulk_ingest(const char* /*mode*/) const override { return false; } bool supports_concurrent_statements() const override { return true; } bool supports_transactions() const override { return false; } bool supports_get_sql_info() const override { return true; } @@ -113,7 +115,6 @@ class SqliteFlightSqlQuirks : public adbc_validation::DriverQuirks { } } bool supports_get_objects() const override { return true; } - bool supports_bulk_ingest() const override { return false; } bool supports_partitioned_data() const override { return true; } bool supports_dynamic_parameter_binding() const override { return true; } }; diff --git a/c/driver/postgresql/connection.cc b/c/driver/postgresql/connection.cc index 08ff9027c3..1c1b0507e5 100644 --- a/c/driver/postgresql/connection.cc +++ b/c/driver/postgresql/connection.cc @@ -36,8 +36,9 @@ namespace { static const uint32_t kSupportedInfoCodes[] = { - ADBC_INFO_VENDOR_NAME, ADBC_INFO_VENDOR_VERSION, ADBC_INFO_DRIVER_NAME, - ADBC_INFO_DRIVER_VERSION, ADBC_INFO_DRIVER_ARROW_VERSION, + ADBC_INFO_VENDOR_NAME, ADBC_INFO_VENDOR_VERSION, + ADBC_INFO_DRIVER_NAME, ADBC_INFO_DRIVER_VERSION, + ADBC_INFO_DRIVER_ARROW_VERSION, ADBC_INFO_DRIVER_ADBC_VERSION, }; static const std::unordered_map kPgTableTypes = { @@ -114,8 +115,10 @@ class PqResultHelper { result_ = PQexecPrepared(conn_, "", param_values_.size(), param_c_strs.data(), NULL, NULL, 0); - if (PQresultStatus(result_) != PGRES_TUPLES_OK) { - SetError(error_, "[libpq] Failed to execute query: %s", PQerrorMessage(conn_)); + ExecStatusType status = PQresultStatus(result_); + if (status != PGRES_TUPLES_OK && status != PGRES_COMMAND_OK) { + SetError(error_, "[libpq] Failed to execute query '%s': %s", query_.c_str(), + PQerrorMessage(conn_)); return ADBC_STATUS_IO; } @@ -729,6 +732,20 @@ class PqGetObjectsHelper { namespace adbcpq { +AdbcStatusCode PostgresConnection::Cancel(struct AdbcError* error) { + // > errbuf must be a char array of size errbufsize (the recommended size is + // > 256 bytes). + // https://www.postgresql.org/docs/current/libpq-cancel.html + char errbuf[256]; + // > The return value is 1 if the cancel request was successfully dispatched + // > and 0 if not. + if (PQcancel(cancel_, errbuf, sizeof(errbuf)) != 1) { + SetError(error, "[libpq] Failed to cancel operation: %s", errbuf); + return ADBC_STATUS_UNKNOWN; + } + return ADBC_STATUS_OK; +} + AdbcStatusCode PostgresConnection::Commit(struct AdbcError* error) { if (autocommit_) { SetError(error, "%s", "[libpq] Cannot commit when autocommit is enabled"); @@ -776,6 +793,10 @@ AdbcStatusCode PostgresConnectionGetInfoImpl(const uint32_t* info_codes, RAISE_ADBC(AdbcConnectionGetInfoAppendString(array, info_codes[i], NANOARROW_VERSION, error)); break; + case ADBC_INFO_DRIVER_ADBC_VERSION: + RAISE_ADBC(AdbcConnectionGetInfoAppendInt(array, info_codes[i], + ADBC_VERSION_1_1_0, error)); + break; default: // Ignore continue; @@ -840,6 +861,47 @@ AdbcStatusCode PostgresConnection::GetObjects( return BatchToArrayStream(&array, &schema, out, error); } +AdbcStatusCode PostgresConnection::GetOption(const char* option, char* value, + size_t* length, struct AdbcError* error) { + std::string output; + if (std::strcmp(option, ADBC_CONNECTION_OPTION_CURRENT_CATALOG) == 0) { + output = PQdb(conn_); + } else if (std::strcmp(option, ADBC_CONNECTION_OPTION_CURRENT_DB_SCHEMA) == 0) { + PqResultHelper result_helper{conn_, "SELECT CURRENT_SCHEMA", {}, error}; + RAISE_ADBC(result_helper.Prepare()); + RAISE_ADBC(result_helper.Execute()); + auto it = result_helper.begin(); + if (it == result_helper.end()) { + SetError(error, "[libpq] PostgreSQL returned no rows for 'SELECT CURRENT_SCHEMA'"); + return ADBC_STATUS_INTERNAL; + } + output = (*it)[0].data; + } else if (std::strcmp(option, ADBC_CONNECTION_OPTION_AUTOCOMMIT) == 0) { + output = autocommit_ ? ADBC_OPTION_VALUE_ENABLED : ADBC_OPTION_VALUE_DISABLED; + } else { + return ADBC_STATUS_NOT_FOUND; + } + + if (output.size() + 1 <= *length) { + std::memcpy(value, output.c_str(), output.size() + 1); + } + *length = output.size() + 1; + return ADBC_STATUS_OK; +} +AdbcStatusCode PostgresConnection::GetOptionBytes(const char* option, uint8_t* value, + size_t* length, + struct AdbcError* error) { + return ADBC_STATUS_NOT_FOUND; +} +AdbcStatusCode PostgresConnection::GetOptionInt(const char* option, int64_t* value, + struct AdbcError* error) { + return ADBC_STATUS_NOT_FOUND; +} +AdbcStatusCode PostgresConnection::GetOptionDouble(const char* option, double* value, + struct AdbcError* error) { + return ADBC_STATUS_NOT_FOUND; +} + AdbcStatusCode PostgresConnection::GetTableSchema(const char* catalog, const char* db_schema, const char* table_name, @@ -964,16 +1026,26 @@ AdbcStatusCode PostgresConnection::GetTableTypes(struct AdbcConnection* connecti AdbcStatusCode PostgresConnection::Init(struct AdbcDatabase* database, struct AdbcError* error) { if (!database || !database->private_data) { - SetError(error, "%s", "[libpq] Must provide an initialized AdbcDatabase"); + SetError(error, "[libpq] Must provide an initialized AdbcDatabase"); return ADBC_STATUS_INVALID_ARGUMENT; } database_ = *reinterpret_cast*>(database->private_data); type_resolver_ = database_->type_resolver(); - return database_->Connect(&conn_, error); + RAISE_ADBC(database_->Connect(&conn_, error)); + cancel_ = PQgetCancel(conn_); + if (!cancel_) { + SetError(error, "[libpq] Could not initialize PGcancel"); + return ADBC_STATUS_UNKNOWN; + } + return ADBC_STATUS_OK; } AdbcStatusCode PostgresConnection::Release(struct AdbcError* error) { + if (cancel_) { + PQfreeCancel(cancel_); + cancel_ = nullptr; + } if (conn_) { return database_->Disconnect(&conn_, error); } @@ -1023,8 +1095,35 @@ AdbcStatusCode PostgresConnection::SetOption(const char* key, const char* value, autocommit_ = autocommit; } return ADBC_STATUS_OK; + } else if (std::strcmp(key, ADBC_CONNECTION_OPTION_CURRENT_DB_SCHEMA) == 0) { + // PostgreSQL doesn't accept a parameter here + PqResultHelper result_helper{ + conn_, std::string("SET search_path TO ") + value, {}, error}; + RAISE_ADBC(result_helper.Prepare()); + RAISE_ADBC(result_helper.Execute()); + return ADBC_STATUS_OK; } SetError(error, "%s%s", "[libpq] Unknown option ", key); return ADBC_STATUS_NOT_IMPLEMENTED; } + +AdbcStatusCode PostgresConnection::SetOptionBytes(const char* key, const uint8_t* value, + size_t length, + struct AdbcError* error) { + SetError(error, "%s%s", "[libpq] Unknown option ", key); + return ADBC_STATUS_NOT_IMPLEMENTED; +} + +AdbcStatusCode PostgresConnection::SetOptionDouble(const char* key, double value, + struct AdbcError* error) { + SetError(error, "%s%s", "[libpq] Unknown option ", key); + return ADBC_STATUS_NOT_IMPLEMENTED; +} + +AdbcStatusCode PostgresConnection::SetOptionInt(const char* key, int64_t value, + struct AdbcError* error) { + SetError(error, "%s%s", "[libpq] Unknown option ", key); + return ADBC_STATUS_NOT_IMPLEMENTED; +} + } // namespace adbcpq diff --git a/c/driver/postgresql/connection.h b/c/driver/postgresql/connection.h index 74315ee053..a990056424 100644 --- a/c/driver/postgresql/connection.h +++ b/c/driver/postgresql/connection.h @@ -29,8 +29,10 @@ namespace adbcpq { class PostgresDatabase; class PostgresConnection { public: - PostgresConnection() : database_(nullptr), conn_(nullptr), autocommit_(true) {} + PostgresConnection() + : database_(nullptr), conn_(nullptr), cancel_(nullptr), autocommit_(true) {} + AdbcStatusCode Cancel(struct AdbcError* error); AdbcStatusCode Commit(struct AdbcError* error); AdbcStatusCode GetInfo(struct AdbcConnection* connection, uint32_t* info_codes, size_t info_codes_length, struct ArrowArrayStream* out, @@ -40,6 +42,14 @@ class PostgresConnection { const char* table_name, const char** table_types, const char* column_name, struct ArrowArrayStream* out, struct AdbcError* error); + AdbcStatusCode GetOption(const char* option, char* value, size_t* length, + struct AdbcError* error); + AdbcStatusCode GetOptionBytes(const char* option, uint8_t* value, size_t* length, + struct AdbcError* error); + AdbcStatusCode GetOptionDouble(const char* option, double* value, + struct AdbcError* error); + AdbcStatusCode GetOptionInt(const char* option, int64_t* value, + struct AdbcError* error); AdbcStatusCode GetTableSchema(const char* catalog, const char* db_schema, const char* table_name, struct ArrowSchema* schema, struct AdbcError* error); @@ -49,6 +59,10 @@ class PostgresConnection { AdbcStatusCode Release(struct AdbcError* error); AdbcStatusCode Rollback(struct AdbcError* error); AdbcStatusCode SetOption(const char* key, const char* value, struct AdbcError* error); + AdbcStatusCode SetOptionBytes(const char* key, const uint8_t* value, size_t length, + struct AdbcError* error); + AdbcStatusCode SetOptionDouble(const char* key, double value, struct AdbcError* error); + AdbcStatusCode SetOptionInt(const char* key, int64_t value, struct AdbcError* error); PGconn* conn() const { return conn_; } const std::shared_ptr& type_resolver() const { @@ -60,6 +74,7 @@ class PostgresConnection { std::shared_ptr database_; std::shared_ptr type_resolver_; PGconn* conn_; + PGcancel* cancel_; bool autocommit_; }; } // namespace adbcpq diff --git a/c/driver/postgresql/database.h b/c/driver/postgresql/database.h index f10464787a..9269b1958b 100644 --- a/c/driver/postgresql/database.h +++ b/c/driver/postgresql/database.h @@ -36,7 +36,19 @@ class PostgresDatabase { AdbcStatusCode Init(struct AdbcError* error); AdbcStatusCode Release(struct AdbcError* error); + AdbcStatusCode GetOption(const char* option, char* value, size_t* length, + struct AdbcError* error); + AdbcStatusCode GetOptionBytes(const char* option, uint8_t* value, size_t* length, + struct AdbcError* error); + AdbcStatusCode GetOptionDouble(const char* option, double* value, + struct AdbcError* error); + AdbcStatusCode GetOptionInt(const char* option, int64_t* value, + struct AdbcError* error); AdbcStatusCode SetOption(const char* key, const char* value, struct AdbcError* error); + AdbcStatusCode SetOptionBytes(const char* key, const uint8_t* value, size_t length, + struct AdbcError* error); + AdbcStatusCode SetOptionDouble(const char* key, double value, struct AdbcError* error); + AdbcStatusCode SetOptionInt(const char* key, int64_t value, struct AdbcError* error); // Internal implementation diff --git a/c/driver/postgresql/postgres_copy_reader.h b/c/driver/postgresql/postgres_copy_reader.h index 7d3844f86b..75aa5db7ed 100644 --- a/c/driver/postgresql/postgres_copy_reader.h +++ b/c/driver/postgresql/postgres_copy_reader.h @@ -843,12 +843,13 @@ static inline ArrowErrorCode MakeCopyFieldReader(const PostgresType& pg_type, class PostgresCopyStreamReader { public: - ArrowErrorCode Init(const PostgresType& pg_type) { + ArrowErrorCode Init(PostgresType pg_type) { if (pg_type.type_id() != PostgresTypeId::kRecord) { return EINVAL; } - root_reader_.Init(pg_type); + pg_type_ = std::move(pg_type); + root_reader_.Init(pg_type_); array_size_approx_bytes_ = 0; return NANOARROW_OK; } @@ -972,7 +973,10 @@ class PostgresCopyStreamReader { return NANOARROW_OK; } + const PostgresType& pg_type() const { return pg_type_; } + private: + PostgresType pg_type_; PostgresCopyFieldTupleReader root_reader_; nanoarrow::UniqueSchema schema_; nanoarrow::UniqueArray array_; diff --git a/c/driver/postgresql/postgresql.cc b/c/driver/postgresql/postgresql.cc index 95e6c8b881..9f6730eb4f 100644 --- a/c/driver/postgresql/postgresql.cc +++ b/c/driver/postgresql/postgresql.cc @@ -83,14 +83,92 @@ AdbcStatusCode PostgresDatabaseRelease(struct AdbcDatabase* database, return status; } +AdbcStatusCode PostgresDatabaseGetOption(struct AdbcDatabase* database, const char* key, + char* value, size_t* length, + struct AdbcError* error) { + if (!database->private_data) return ADBC_STATUS_INVALID_STATE; + auto ptr = reinterpret_cast*>(database->private_data); + return (*ptr)->GetOption(key, value, length, error); +} + +AdbcStatusCode PostgresDatabaseGetOptionBytes(struct AdbcDatabase* database, + const char* key, uint8_t* value, + size_t* length, struct AdbcError* error) { + if (!database->private_data) return ADBC_STATUS_INVALID_STATE; + auto ptr = reinterpret_cast*>(database->private_data); + return (*ptr)->GetOptionBytes(key, value, length, error); +} + +AdbcStatusCode PostgresDatabaseGetOptionDouble(struct AdbcDatabase* database, + const char* key, double* value, + struct AdbcError* error) { + if (!database->private_data) return ADBC_STATUS_INVALID_STATE; + auto ptr = reinterpret_cast*>(database->private_data); + return (*ptr)->GetOptionDouble(key, value, error); +} + +AdbcStatusCode PostgresDatabaseGetOptionInt(struct AdbcDatabase* database, + const char* key, int64_t* value, + struct AdbcError* error) { + if (!database->private_data) return ADBC_STATUS_INVALID_STATE; + auto ptr = reinterpret_cast*>(database->private_data); + return (*ptr)->GetOptionInt(key, value, error); +} + AdbcStatusCode PostgresDatabaseSetOption(struct AdbcDatabase* database, const char* key, const char* value, struct AdbcError* error) { if (!database || !database->private_data) return ADBC_STATUS_INVALID_STATE; auto ptr = reinterpret_cast*>(database->private_data); return (*ptr)->SetOption(key, value, error); } + +AdbcStatusCode PostgresDatabaseSetOptionBytes(struct AdbcDatabase* database, + const char* key, const uint8_t* value, + size_t length, struct AdbcError* error) { + if (!database->private_data) return ADBC_STATUS_INVALID_STATE; + auto ptr = reinterpret_cast*>(database->private_data); + return (*ptr)->SetOptionBytes(key, value, length, error); +} + +AdbcStatusCode PostgresDatabaseSetOptionDouble(struct AdbcDatabase* database, + const char* key, double value, + struct AdbcError* error) { + if (!database->private_data) return ADBC_STATUS_INVALID_STATE; + auto ptr = reinterpret_cast*>(database->private_data); + return (*ptr)->SetOptionDouble(key, value, error); +} + +AdbcStatusCode PostgresDatabaseSetOptionInt(struct AdbcDatabase* database, + const char* key, int64_t value, + struct AdbcError* error) { + if (!database->private_data) return ADBC_STATUS_INVALID_STATE; + auto ptr = reinterpret_cast*>(database->private_data); + return (*ptr)->SetOptionInt(key, value, error); +} } // namespace +AdbcStatusCode AdbcDatabaseGetOption(struct AdbcDatabase* database, const char* key, + char* value, size_t* length, + struct AdbcError* error) { + return PostgresDatabaseGetOption(database, key, value, length, error); +} + +AdbcStatusCode AdbcDatabaseGetOptionBytes(struct AdbcDatabase* database, const char* key, + uint8_t* value, size_t* length, + struct AdbcError* error) { + return PostgresDatabaseGetOptionBytes(database, key, value, length, error); +} + +AdbcStatusCode AdbcDatabaseGetOptionInt(struct AdbcDatabase* database, const char* key, + int64_t* value, struct AdbcError* error) { + return PostgresDatabaseGetOptionInt(database, key, value, error); +} + +AdbcStatusCode AdbcDatabaseGetOptionDouble(struct AdbcDatabase* database, const char* key, + double* value, struct AdbcError* error) { + return PostgresDatabaseGetOptionDouble(database, key, value, error); +} + AdbcStatusCode AdbcDatabaseInit(struct AdbcDatabase* database, struct AdbcError* error) { return PostgresDatabaseInit(database, error); } @@ -109,10 +187,34 @@ AdbcStatusCode AdbcDatabaseSetOption(struct AdbcDatabase* database, const char* return PostgresDatabaseSetOption(database, key, value, error); } +AdbcStatusCode AdbcDatabaseSetOptionBytes(struct AdbcDatabase* database, const char* key, + const uint8_t* value, size_t length, + struct AdbcError* error) { + return PostgresDatabaseSetOptionBytes(database, key, value, length, error); +} + +AdbcStatusCode AdbcDatabaseSetOptionInt(struct AdbcDatabase* database, const char* key, + int64_t value, struct AdbcError* error) { + return PostgresDatabaseSetOptionInt(database, key, value, error); +} + +AdbcStatusCode AdbcDatabaseSetOptionDouble(struct AdbcDatabase* database, const char* key, + double value, struct AdbcError* error) { + return PostgresDatabaseSetOptionDouble(database, key, value, error); +} + // --------------------------------------------------------------------- // AdbcConnection namespace { +AdbcStatusCode PostgresConnectionCancel(struct AdbcConnection* connection, + struct AdbcError* error) { + if (!connection->private_data) return ADBC_STATUS_INVALID_STATE; + auto ptr = + reinterpret_cast*>(connection->private_data); + return (*ptr)->Cancel(error); +} + AdbcStatusCode PostgresConnectionCommit(struct AdbcConnection* connection, struct AdbcError* error) { if (!connection->private_data) return ADBC_STATUS_INVALID_STATE; @@ -142,6 +244,42 @@ AdbcStatusCode PostgresConnectionGetObjects( table_types, column_name, stream, error); } +AdbcStatusCode PostgresConnectionGetOption(struct AdbcConnection* connection, + const char* key, char* value, size_t* length, + struct AdbcError* error) { + if (!connection->private_data) return ADBC_STATUS_INVALID_STATE; + auto ptr = + reinterpret_cast*>(connection->private_data); + return (*ptr)->GetOption(key, value, length, error); +} + +AdbcStatusCode PostgresConnectionGetOptionBytes(struct AdbcConnection* connection, + const char* key, uint8_t* value, + size_t* length, struct AdbcError* error) { + if (!connection->private_data) return ADBC_STATUS_INVALID_STATE; + auto ptr = + reinterpret_cast*>(connection->private_data); + return (*ptr)->GetOptionBytes(key, value, length, error); +} + +AdbcStatusCode PostgresConnectionGetOptionDouble(struct AdbcConnection* connection, + const char* key, double* value, + struct AdbcError* error) { + if (!connection->private_data) return ADBC_STATUS_INVALID_STATE; + auto ptr = + reinterpret_cast*>(connection->private_data); + return (*ptr)->GetOptionDouble(key, value, error); +} + +AdbcStatusCode PostgresConnectionGetOptionInt(struct AdbcConnection* connection, + const char* key, int64_t* value, + struct AdbcError* error) { + if (!connection->private_data) return ADBC_STATUS_INVALID_STATE; + auto ptr = + reinterpret_cast*>(connection->private_data); + return (*ptr)->GetOptionInt(key, value, error); +} + AdbcStatusCode PostgresConnectionGetTableSchema( struct AdbcConnection* connection, const char* catalog, const char* db_schema, const char* table_name, struct ArrowSchema* schema, struct AdbcError* error) { @@ -213,7 +351,40 @@ AdbcStatusCode PostgresConnectionSetOption(struct AdbcConnection* connection, return (*ptr)->SetOption(key, value, error); } +AdbcStatusCode PostgresConnectionSetOptionBytes(struct AdbcConnection* connection, + const char* key, const uint8_t* value, + size_t length, struct AdbcError* error) { + if (!connection->private_data) return ADBC_STATUS_INVALID_STATE; + auto ptr = + reinterpret_cast*>(connection->private_data); + return (*ptr)->SetOptionBytes(key, value, length, error); +} + +AdbcStatusCode PostgresConnectionSetOptionDouble(struct AdbcConnection* connection, + const char* key, double value, + struct AdbcError* error) { + if (!connection->private_data) return ADBC_STATUS_INVALID_STATE; + auto ptr = + reinterpret_cast*>(connection->private_data); + return (*ptr)->SetOptionDouble(key, value, error); +} + +AdbcStatusCode PostgresConnectionSetOptionInt(struct AdbcConnection* connection, + const char* key, int64_t value, + struct AdbcError* error) { + if (!connection->private_data) return ADBC_STATUS_INVALID_STATE; + auto ptr = + reinterpret_cast*>(connection->private_data); + return (*ptr)->SetOptionInt(key, value, error); +} + } // namespace + +AdbcStatusCode AdbcConnectionCancel(struct AdbcConnection* connection, + struct AdbcError* error) { + return PostgresConnectionCancel(connection, error); +} + AdbcStatusCode AdbcConnectionCommit(struct AdbcConnection* connection, struct AdbcError* error) { return PostgresConnectionCommit(connection, error); @@ -237,6 +408,30 @@ AdbcStatusCode AdbcConnectionGetObjects(struct AdbcConnection* connection, int d table_types, column_name, stream, error); } +AdbcStatusCode AdbcConnectionGetOption(struct AdbcConnection* connection, const char* key, + char* value, size_t* length, + struct AdbcError* error) { + return PostgresConnectionGetOption(connection, key, value, length, error); +} + +AdbcStatusCode AdbcConnectionGetOptionBytes(struct AdbcConnection* connection, + const char* key, uint8_t* value, + size_t* length, struct AdbcError* error) { + return PostgresConnectionGetOptionBytes(connection, key, value, length, error); +} + +AdbcStatusCode AdbcConnectionGetOptionInt(struct AdbcConnection* connection, + const char* key, int64_t* value, + struct AdbcError* error) { + return PostgresConnectionGetOptionInt(connection, key, value, error); +} + +AdbcStatusCode AdbcConnectionGetOptionDouble(struct AdbcConnection* connection, + const char* key, double* value, + struct AdbcError* error) { + return PostgresConnectionGetOptionDouble(connection, key, value, error); +} + AdbcStatusCode AdbcConnectionGetTableSchema(struct AdbcConnection* connection, const char* catalog, const char* db_schema, const char* table_name, @@ -287,6 +482,24 @@ AdbcStatusCode AdbcConnectionSetOption(struct AdbcConnection* connection, const return PostgresConnectionSetOption(connection, key, value, error); } +AdbcStatusCode AdbcConnectionSetOptionBytes(struct AdbcConnection* connection, + const char* key, const uint8_t* value, + size_t length, struct AdbcError* error) { + return PostgresConnectionSetOptionBytes(connection, key, value, length, error); +} + +AdbcStatusCode AdbcConnectionSetOptionInt(struct AdbcConnection* connection, + const char* key, int64_t value, + struct AdbcError* error) { + return PostgresConnectionSetOptionInt(connection, key, value, error); +} + +AdbcStatusCode AdbcConnectionSetOptionDouble(struct AdbcConnection* connection, + const char* key, double value, + struct AdbcError* error) { + return PostgresConnectionSetOptionDouble(connection, key, value, error); +} + // --------------------------------------------------------------------- // AdbcStatement @@ -310,6 +523,14 @@ AdbcStatusCode PostgresStatementBindStream(struct AdbcStatement* statement, return (*ptr)->Bind(stream, error); } +AdbcStatusCode PostgresStatementCancel(struct AdbcStatement* statement, + struct AdbcError* error) { + if (!statement->private_data) return ADBC_STATUS_INVALID_STATE; + auto* ptr = + reinterpret_cast*>(statement->private_data); + return (*ptr)->Cancel(error); +} + AdbcStatusCode PostgresStatementExecutePartitions(struct AdbcStatement* statement, struct ArrowSchema* schema, struct AdbcPartitions* partitions, @@ -329,16 +550,49 @@ AdbcStatusCode PostgresStatementExecuteQuery(struct AdbcStatement* statement, return (*ptr)->ExecuteQuery(output, rows_affected, error); } -AdbcStatusCode PostgresStatementGetPartitionDesc(struct AdbcStatement* statement, - uint8_t* partition_desc, - struct AdbcError* error) { - return ADBC_STATUS_NOT_IMPLEMENTED; +AdbcStatusCode PostgresStatementExecuteSchema(struct AdbcStatement* statement, + struct ArrowSchema* schema, + struct AdbcError* error) { + if (!statement->private_data) return ADBC_STATUS_INVALID_STATE; + auto* ptr = + reinterpret_cast*>(statement->private_data); + return (*ptr)->ExecuteSchema(schema, error); } -AdbcStatusCode PostgresStatementGetPartitionDescSize(struct AdbcStatement* statement, - size_t* length, - struct AdbcError* error) { - return ADBC_STATUS_NOT_IMPLEMENTED; +AdbcStatusCode PostgresStatementGetOption(struct AdbcStatement* statement, + const char* key, char* value, size_t* length, + struct AdbcError* error) { + if (!statement->private_data) return ADBC_STATUS_INVALID_STATE; + auto ptr = + reinterpret_cast*>(statement->private_data); + return (*ptr)->GetOption(key, value, length, error); +} + +AdbcStatusCode PostgresStatementGetOptionBytes(struct AdbcStatement* statement, + const char* key, uint8_t* value, + size_t* length, struct AdbcError* error) { + if (!statement->private_data) return ADBC_STATUS_INVALID_STATE; + auto ptr = + reinterpret_cast*>(statement->private_data); + return (*ptr)->GetOptionBytes(key, value, length, error); +} + +AdbcStatusCode PostgresStatementGetOptionDouble(struct AdbcStatement* statement, + const char* key, double* value, + struct AdbcError* error) { + if (!statement->private_data) return ADBC_STATUS_INVALID_STATE; + auto ptr = + reinterpret_cast*>(statement->private_data); + return (*ptr)->GetOptionDouble(key, value, error); +} + +AdbcStatusCode PostgresStatementGetOptionInt(struct AdbcStatement* statement, + const char* key, int64_t* value, + struct AdbcError* error) { + if (!statement->private_data) return ADBC_STATUS_INVALID_STATE; + auto ptr = + reinterpret_cast*>(statement->private_data); + return (*ptr)->GetOptionInt(key, value, error); } AdbcStatusCode PostgresStatementGetParameterSchema(struct AdbcStatement* statement, @@ -386,6 +640,33 @@ AdbcStatusCode PostgresStatementSetOption(struct AdbcStatement* statement, return (*ptr)->SetOption(key, value, error); } +AdbcStatusCode PostgresStatementSetOptionBytes(struct AdbcStatement* statement, + const char* key, const uint8_t* value, + size_t length, struct AdbcError* error) { + if (!statement->private_data) return ADBC_STATUS_INVALID_STATE; + auto ptr = + reinterpret_cast*>(statement->private_data); + return (*ptr)->SetOptionBytes(key, value, length, error); +} + +AdbcStatusCode PostgresStatementSetOptionDouble(struct AdbcStatement* statement, + const char* key, double value, + struct AdbcError* error) { + if (!statement->private_data) return ADBC_STATUS_INVALID_STATE; + auto ptr = + reinterpret_cast*>(statement->private_data); + return (*ptr)->SetOptionDouble(key, value, error); +} + +AdbcStatusCode PostgresStatementSetOptionInt(struct AdbcStatement* statement, + const char* key, int64_t value, + struct AdbcError* error) { + if (!statement->private_data) return ADBC_STATUS_INVALID_STATE; + auto ptr = + reinterpret_cast*>(statement->private_data); + return (*ptr)->SetOptionInt(key, value, error); +} + AdbcStatusCode PostgresStatementSetSqlQuery(struct AdbcStatement* statement, const char* query, struct AdbcError* error) { if (!statement->private_data) return ADBC_STATUS_INVALID_STATE; @@ -407,6 +688,11 @@ AdbcStatusCode AdbcStatementBindStream(struct AdbcStatement* statement, return PostgresStatementBindStream(statement, stream, error); } +AdbcStatusCode AdbcStatementCancel(struct AdbcStatement* statement, + struct AdbcError* error) { + return PostgresStatementCancel(statement, error); +} + AdbcStatusCode AdbcStatementExecutePartitions(struct AdbcStatement* statement, ArrowSchema* schema, struct AdbcPartitions* partitions, @@ -423,16 +709,32 @@ AdbcStatusCode AdbcStatementExecuteQuery(struct AdbcStatement* statement, return PostgresStatementExecuteQuery(statement, output, rows_affected, error); } -AdbcStatusCode AdbcStatementGetPartitionDesc(struct AdbcStatement* statement, - uint8_t* partition_desc, - struct AdbcError* error) { - return PostgresStatementGetPartitionDesc(statement, partition_desc, error); +AdbcStatusCode AdbcStatementExecuteSchema(struct AdbcStatement* statement, + ArrowSchema* schema, struct AdbcError* error) { + return PostgresStatementExecuteSchema(statement, schema, error); } -AdbcStatusCode AdbcStatementGetPartitionDescSize(struct AdbcStatement* statement, - size_t* length, - struct AdbcError* error) { - return PostgresStatementGetPartitionDescSize(statement, length, error); +AdbcStatusCode AdbcStatementGetOption(struct AdbcStatement* statement, const char* key, + char* value, size_t* length, + struct AdbcError* error) { + return PostgresStatementGetOption(statement, key, value, length, error); +} + +AdbcStatusCode AdbcStatementGetOptionBytes(struct AdbcStatement* statement, + const char* key, uint8_t* value, + size_t* length, struct AdbcError* error) { + return PostgresStatementGetOptionBytes(statement, key, value, length, error); +} + +AdbcStatusCode AdbcStatementGetOptionInt(struct AdbcStatement* statement, const char* key, + int64_t* value, struct AdbcError* error) { + return PostgresStatementGetOptionInt(statement, key, value, error); +} + +AdbcStatusCode AdbcStatementGetOptionDouble(struct AdbcStatement* statement, + const char* key, double* value, + struct AdbcError* error) { + return PostgresStatementGetOptionDouble(statement, key, value, error); } AdbcStatusCode AdbcStatementGetParameterSchema(struct AdbcStatement* statement, @@ -462,6 +764,23 @@ AdbcStatusCode AdbcStatementSetOption(struct AdbcStatement* statement, const cha return PostgresStatementSetOption(statement, key, value, error); } +AdbcStatusCode AdbcStatementSetOptionBytes(struct AdbcStatement* statement, + const char* key, const uint8_t* value, + size_t length, struct AdbcError* error) { + return PostgresStatementSetOptionBytes(statement, key, value, length, error); +} + +AdbcStatusCode AdbcStatementSetOptionInt(struct AdbcStatement* statement, const char* key, + int64_t value, struct AdbcError* error) { + return PostgresStatementSetOptionInt(statement, key, value, error); +} + +AdbcStatusCode AdbcStatementSetOptionDouble(struct AdbcStatement* statement, + const char* key, double value, + struct AdbcError* error) { + return PostgresStatementSetOptionDouble(statement, key, value, error); +} + AdbcStatusCode AdbcStatementSetSqlQuery(struct AdbcStatement* statement, const char* query, struct AdbcError* error) { return PostgresStatementSetSqlQuery(statement, query, error); @@ -469,12 +788,47 @@ AdbcStatusCode AdbcStatementSetSqlQuery(struct AdbcStatement* statement, extern "C" { ADBC_EXPORT -AdbcStatusCode AdbcDriverInit(int version, void* raw_driver, struct AdbcError* error) { - if (version != ADBC_VERSION_1_0_0) return ADBC_STATUS_NOT_IMPLEMENTED; +AdbcStatusCode PostgresqlDriverInit(int version, void* raw_driver, + struct AdbcError* error) { + if (version != ADBC_VERSION_1_0_0 && version != ADBC_VERSION_1_1_0) { + return ADBC_STATUS_NOT_IMPLEMENTED; + } if (!raw_driver) return ADBC_STATUS_INVALID_ARGUMENT; auto* driver = reinterpret_cast(raw_driver); - std::memset(driver, 0, ADBC_DRIVER_1_0_0_SIZE); + if (version >= ADBC_VERSION_1_1_0) { + std::memset(driver, 0, ADBC_DRIVER_1_1_0_SIZE); + + driver->DatabaseGetOption = PostgresDatabaseGetOption; + driver->DatabaseGetOptionBytes = PostgresDatabaseGetOptionBytes; + driver->DatabaseGetOptionDouble = PostgresDatabaseGetOptionDouble; + driver->DatabaseGetOptionInt = PostgresDatabaseGetOptionInt; + driver->DatabaseSetOptionBytes = PostgresDatabaseSetOptionBytes; + driver->DatabaseSetOptionDouble = PostgresDatabaseSetOptionDouble; + driver->DatabaseSetOptionInt = PostgresDatabaseSetOptionInt; + + driver->ConnectionCancel = PostgresConnectionCancel; + driver->ConnectionGetOption = PostgresConnectionGetOption; + driver->ConnectionGetOptionBytes = PostgresConnectionGetOptionBytes; + driver->ConnectionGetOptionDouble = PostgresConnectionGetOptionDouble; + driver->ConnectionGetOptionInt = PostgresConnectionGetOptionInt; + driver->ConnectionSetOptionBytes = PostgresConnectionSetOptionBytes; + driver->ConnectionSetOptionDouble = PostgresConnectionSetOptionDouble; + driver->ConnectionSetOptionInt = PostgresConnectionSetOptionInt; + + driver->StatementCancel = PostgresStatementCancel; + driver->StatementExecuteSchema = PostgresStatementExecuteSchema; + driver->StatementGetOption = PostgresStatementGetOption; + driver->StatementGetOptionBytes = PostgresStatementGetOptionBytes; + driver->StatementGetOptionDouble = PostgresStatementGetOptionDouble; + driver->StatementGetOptionInt = PostgresStatementGetOptionInt; + driver->StatementSetOptionBytes = PostgresStatementSetOptionBytes; + driver->StatementSetOptionDouble = PostgresStatementSetOptionDouble; + driver->StatementSetOptionInt = PostgresStatementSetOptionInt; + } else { + std::memset(driver, 0, ADBC_DRIVER_1_0_0_SIZE); + } + driver->DatabaseInit = PostgresDatabaseInit; driver->DatabaseNew = PostgresDatabaseNew; driver->DatabaseRelease = PostgresDatabaseRelease; @@ -502,6 +856,12 @@ AdbcStatusCode AdbcDriverInit(int version, void* raw_driver, struct AdbcError* e driver->StatementRelease = PostgresStatementRelease; driver->StatementSetOption = PostgresStatementSetOption; driver->StatementSetSqlQuery = PostgresStatementSetSqlQuery; + return ADBC_STATUS_OK; } + +ADBC_EXPORT +AdbcStatusCode AdbcDriverInit(int version, void* raw_driver, struct AdbcError* error) { + return PostgresqlDriverInit(version, raw_driver, error); +} } diff --git a/c/driver/postgresql/postgresql_test.cc b/c/driver/postgresql/postgresql_test.cc index 33115bcf8b..d646ded8d7 100644 --- a/c/driver/postgresql/postgresql_test.cc +++ b/c/driver/postgresql/postgresql_test.cc @@ -103,6 +103,28 @@ class PostgresQuirks : public adbc_validation::DriverQuirks { std::string catalog() const override { return "postgres"; } std::string db_schema() const override { return "public"; } + + bool supports_cancel() const override { return true; } + bool supports_execute_schema() const override { return true; } + std::optional supports_get_sql_info( + uint32_t info_code) const override { + switch (info_code) { + case ADBC_INFO_DRIVER_ADBC_VERSION: + return ADBC_VERSION_1_1_0; + case ADBC_INFO_DRIVER_NAME: + return "ADBC PostgreSQL Driver"; + case ADBC_INFO_DRIVER_VERSION: + return "(unknown)"; + case ADBC_INFO_VENDOR_NAME: + return "PostgreSQL"; + case ADBC_INFO_VENDOR_VERSION: + return "150002"; + default: + return std::nullopt; + } + } + bool supports_metadata_current_catalog() const override { return true; } + bool supports_metadata_current_db_schema() const override { return true; } }; class PostgresDatabaseTest : public ::testing::Test, @@ -134,10 +156,8 @@ TEST_F(PostgresConnectionTest, GetInfoMetadata) { adbc_validation::StreamReader reader; std::vector info = { - ADBC_INFO_DRIVER_NAME, - ADBC_INFO_DRIVER_VERSION, - ADBC_INFO_VENDOR_NAME, - ADBC_INFO_VENDOR_VERSION, + ADBC_INFO_DRIVER_NAME, ADBC_INFO_DRIVER_VERSION, ADBC_INFO_DRIVER_ADBC_VERSION, + ADBC_INFO_VENDOR_NAME, ADBC_INFO_VENDOR_VERSION, }; ASSERT_THAT(AdbcConnectionGetInfo(&connection, info.data(), info.size(), &reader.stream.value, &error), @@ -153,29 +173,30 @@ TEST_F(PostgresConnectionTest, GetInfoMetadata) { ASSERT_FALSE(ArrowArrayViewIsNull(reader.array_view->children[0], row)); const uint32_t code = reader.array_view->children[0]->buffer_views[1].data.as_uint32[row]; + const uint32_t offset = + reader.array_view->children[1]->buffer_views[1].data.as_int32[row]; seen.push_back(code); - int str_child_index = 0; - struct ArrowArrayView* str_child = - reader.array_view->children[1]->children[str_child_index]; + struct ArrowArrayView* str_child = reader.array_view->children[1]->children[0]; + struct ArrowArrayView* int_child = reader.array_view->children[1]->children[2]; switch (code) { case ADBC_INFO_DRIVER_NAME: { - ArrowStringView val = ArrowArrayViewGetStringUnsafe(str_child, 0); + ArrowStringView val = ArrowArrayViewGetStringUnsafe(str_child, offset); EXPECT_EQ("ADBC PostgreSQL Driver", std::string(val.data, val.size_bytes)); break; } case ADBC_INFO_DRIVER_VERSION: { - ArrowStringView val = ArrowArrayViewGetStringUnsafe(str_child, 1); + ArrowStringView val = ArrowArrayViewGetStringUnsafe(str_child, offset); EXPECT_EQ("(unknown)", std::string(val.data, val.size_bytes)); break; } case ADBC_INFO_VENDOR_NAME: { - ArrowStringView val = ArrowArrayViewGetStringUnsafe(str_child, 2); + ArrowStringView val = ArrowArrayViewGetStringUnsafe(str_child, offset); EXPECT_EQ("PostgreSQL", std::string(val.data, val.size_bytes)); break; } case ADBC_INFO_VENDOR_VERSION: { - ArrowStringView val = ArrowArrayViewGetStringUnsafe(str_child, 3); + ArrowStringView val = ArrowArrayViewGetStringUnsafe(str_child, offset); #ifdef __WIN32 const char* pater = "\\d\\d\\d\\d\\d\\d"; #else @@ -185,6 +206,10 @@ TEST_F(PostgresConnectionTest, GetInfoMetadata) { ::testing::MatchesRegex(pater)); break; } + case ADBC_INFO_DRIVER_ADBC_VERSION: { + EXPECT_EQ(ADBC_VERSION_1_1_0, ArrowArrayViewGetIntUnsafe(int_child, offset)); + break; + } default: // Ignored break; @@ -198,10 +223,6 @@ TEST_F(PostgresConnectionTest, GetObjectsGetCatalogs) { ASSERT_THAT(AdbcConnectionNew(&connection, &error), IsOkStatus(&error)); ASSERT_THAT(AdbcConnectionInit(&connection, &database, &error), IsOkStatus(&error)); - if (!quirks()->supports_get_objects()) { - GTEST_SKIP(); - } - adbc_validation::StreamReader reader; ASSERT_THAT( AdbcConnectionGetObjects(&connection, ADBC_OBJECT_DEPTH_CATALOGS, nullptr, nullptr, @@ -228,10 +249,6 @@ TEST_F(PostgresConnectionTest, GetObjectsGetDbSchemas) { ASSERT_THAT(AdbcConnectionNew(&connection, &error), IsOkStatus(&error)); ASSERT_THAT(AdbcConnectionInit(&connection, &database, &error), IsOkStatus(&error)); - if (!quirks()->supports_get_objects()) { - GTEST_SKIP(); - } - adbc_validation::StreamReader reader; ASSERT_THAT(AdbcConnectionGetObjects(&connection, ADBC_OBJECT_DEPTH_DB_SCHEMAS, nullptr, nullptr, nullptr, nullptr, nullptr, @@ -255,10 +272,6 @@ TEST_F(PostgresConnectionTest, GetObjectsGetAllFindsPrimaryKey) { ASSERT_THAT(AdbcConnectionNew(&connection, &error), IsOkStatus(&error)); ASSERT_THAT(AdbcConnectionInit(&connection, &database, &error), IsOkStatus(&error)); - if (!quirks()->supports_get_objects()) { - GTEST_SKIP(); - } - ASSERT_THAT(quirks()->DropTable(&connection, "adbc_pkey_test", &error), IsOkStatus(&error)); @@ -329,10 +342,6 @@ TEST_F(PostgresConnectionTest, GetObjectsGetAllFindsForeignKey) { ASSERT_THAT(AdbcConnectionNew(&connection, &error), IsOkStatus(&error)); ASSERT_THAT(AdbcConnectionInit(&connection, &database, &error), IsOkStatus(&error)); - if (!quirks()->supports_get_objects()) { - GTEST_SKIP(); - } - ASSERT_THAT(quirks()->DropTable(&connection, "adbc_fkey_test", &error), IsOkStatus(&error)); ASSERT_THAT(quirks()->DropTable(&connection, "adbc_fkey_test_base", &error), @@ -450,10 +459,6 @@ TEST_F(PostgresConnectionTest, GetObjectsTableTypesFilter) { ASSERT_THAT(AdbcConnectionNew(&connection, &error), IsOkStatus(&error)); ASSERT_THAT(AdbcConnectionInit(&connection, &database, &error), IsOkStatus(&error)); - if (!quirks()->supports_get_objects()) { - GTEST_SKIP(); - } - ASSERT_THAT(quirks()->DropView(&connection, "adbc_table_types_view_test", &error), IsOkStatus(&error)); ASSERT_THAT(quirks()->DropTable(&connection, "adbc_table_types_table_test", &error), @@ -516,7 +521,7 @@ TEST_F(PostgresConnectionTest, GetObjectsTableTypesFilter) { } TEST_F(PostgresConnectionTest, MetadataGetTableSchemaInjection) { - if (!quirks()->supports_bulk_ingest()) { + if (!quirks()->supports_bulk_ingest(ADBC_INGEST_OPTION_MODE_CREATE)) { GTEST_SKIP(); } ASSERT_THAT(AdbcConnectionNew(&connection, &error), IsOkStatus(&error)); @@ -549,6 +554,57 @@ TEST_F(PostgresConnectionTest, MetadataGetTableSchemaInjection) { {"strings", NANOARROW_TYPE_STRING, true}})); } +TEST_F(PostgresConnectionTest, MetadataSetCurrentDbSchema) { + ASSERT_THAT(AdbcConnectionNew(&connection, &error), IsOkStatus(&error)); + ASSERT_THAT(AdbcConnectionInit(&connection, &database, &error), IsOkStatus(&error)); + + { + adbc_validation::Handle statement; + ASSERT_THAT(AdbcStatementNew(&connection, &statement.value, &error), + IsOkStatus(&error)); + + ASSERT_THAT(AdbcStatementSetSqlQuery( + &statement.value, "CREATE SCHEMA IF NOT EXISTS testschema", &error), + IsOkStatus(&error)); + ASSERT_THAT(AdbcStatementExecuteQuery(&statement.value, nullptr, nullptr, &error), + IsOkStatus(&error)); + + ASSERT_THAT( + AdbcStatementSetSqlQuery( + &statement.value, + "CREATE TABLE IF NOT EXISTS testschema.schematable (ints INT)", &error), + IsOkStatus(&error)); + ASSERT_THAT(AdbcStatementExecuteQuery(&statement.value, nullptr, nullptr, &error), + IsOkStatus(&error)); + + ASSERT_THAT(AdbcStatementRelease(&statement.value, &error), IsOkStatus(&error)); + } + + adbc_validation::Handle statement; + ASSERT_THAT(AdbcStatementNew(&connection, &statement.value, &error), + IsOkStatus(&error)); + + // Table does not exist in this schema + ASSERT_THAT( + AdbcStatementSetSqlQuery(&statement.value, "SELECT * FROM schematable", &error), + IsOkStatus(&error)); + ASSERT_THAT(AdbcStatementExecuteQuery(&statement.value, nullptr, nullptr, &error), + IsStatus(ADBC_STATUS_IO, &error)); + + ASSERT_THAT( + AdbcConnectionSetOption(&connection, ADBC_CONNECTION_OPTION_CURRENT_DB_SCHEMA, + "testschema", &error), + IsOkStatus(&error)); + + ASSERT_THAT( + AdbcStatementSetSqlQuery(&statement.value, "SELECT * FROM schematable", &error), + IsOkStatus(&error)); + ASSERT_THAT(AdbcStatementExecuteQuery(&statement.value, nullptr, nullptr, &error), + IsOkStatus(&error)); + + ASSERT_THAT(AdbcStatementRelease(&statement.value, &error), IsOkStatus(&error)); +} + ADBCV_TEST_CONNECTION(PostgresConnectionTest) class PostgresStatementTest : public ::testing::Test, diff --git a/c/driver/postgresql/statement.cc b/c/driver/postgresql/statement.cc index 4cae15b631..fe14bac6ef 100644 --- a/c/driver/postgresql/statement.cc +++ b/c/driver/postgresql/statement.cc @@ -746,11 +746,41 @@ AdbcStatusCode PostgresStatement::Bind(struct ArrowArrayStream* stream, return ADBC_STATUS_OK; } +AdbcStatusCode PostgresStatement::Cancel(struct AdbcError* error) { + // Ultimately the same underlying PGconn + return connection_->Cancel(error); +} + AdbcStatusCode PostgresStatement::CreateBulkTable( const struct ArrowSchema& source_schema, const std::vector& source_schema_fields, struct AdbcError* error) { std::string create = "CREATE TABLE "; + switch (ingest_.mode) { + case IngestMode::kCreate: + // Nothing to do + break; + case IngestMode::kAppend: + return ADBC_STATUS_OK; + case IngestMode::kReplace: { + std::string drop = "DROP TABLE IF EXISTS " + ingest_.target; + PGresult* result = PQexecParams(connection_->conn(), drop.c_str(), /*nParams=*/0, + /*paramTypes=*/nullptr, /*paramValues=*/nullptr, + /*paramLengths=*/nullptr, /*paramFormats=*/nullptr, + /*resultFormat=*/1 /*(binary)*/); + if (PQresultStatus(result) != PGRES_COMMAND_OK) { + SetError(error, "[libpq] Failed to drop table: %s\nQuery was: %s", + PQerrorMessage(connection_->conn()), drop.c_str()); + PQclear(result); + return ADBC_STATUS_IO; + } + PQclear(result); + break; + } + case IngestMode::kCreateAppend: + create += "IF NOT EXISTS "; + break; + } create += ingest_.target; create += " ("; @@ -867,50 +897,12 @@ AdbcStatusCode PostgresStatement::ExecuteQuery(struct ArrowArrayStream* stream, // 1. Prepare the query to get the schema { - // TODO: we should pipeline here and assume this will succeed - PGresult* result = PQprepare(connection_->conn(), /*stmtName=*/"", query_.c_str(), - /*nParams=*/0, nullptr); - if (PQresultStatus(result) != PGRES_COMMAND_OK) { - SetError(error, - "[libpq] Failed to execute query: could not infer schema: failed to " - "prepare query: %s\nQuery was:%s", - PQerrorMessage(connection_->conn()), query_.c_str()); - PQclear(result); - return ADBC_STATUS_IO; - } - PQclear(result); - result = PQdescribePrepared(connection_->conn(), /*stmtName=*/""); - if (PQresultStatus(result) != PGRES_COMMAND_OK) { - SetError(error, - "[libpq] Failed to execute query: could not infer schema: failed to " - "describe prepared statement: %s\nQuery was:%s", - PQerrorMessage(connection_->conn()), query_.c_str()); - PQclear(result); - return ADBC_STATUS_IO; - } - - // Resolve the information from the PGresult into a PostgresType - PostgresType root_type; - AdbcStatusCode status = - ResolvePostgresType(*type_resolver_, result, &root_type, error); - PQclear(result); - if (status != ADBC_STATUS_OK) return status; - - // Initialize the copy reader and infer the output schema (i.e., error for - // unsupported types before issuing the COPY query) - reader_.copy_reader_.reset(new PostgresCopyStreamReader()); - reader_.copy_reader_->Init(root_type); - struct ArrowError na_error; - int na_res = reader_.copy_reader_->InferOutputSchema(&na_error); - if (na_res != NANOARROW_OK) { - SetError(error, "[libpq] Failed to infer output schema: %s", na_error.message); - return na_res; - } + RAISE_ADBC(SetupReader(error)); // If the caller did not request a result set or if there are no // inferred output columns (e.g. a CREATE or UPDATE), then don't // use COPY (which would fail anyways) - if (!stream || root_type.n_children() == 0) { + if (!stream || reader_.copy_reader_->pg_type().n_children() == 0) { RAISE_ADBC(ExecuteUpdateQuery(rows_affected, error)); if (stream) { struct ArrowSchema schema; @@ -924,7 +916,8 @@ AdbcStatusCode PostgresStatement::ExecuteQuery(struct ArrowArrayStream* stream, // This resolves the reader specific to each PostgresType -> ArrowSchema // conversion. It is unlikely that this will fail given that we have just // inferred these conversions ourselves. - na_res = reader_.copy_reader_->InitFieldReaders(&na_error); + struct ArrowError na_error; + int na_res = reader_.copy_reader_->InitFieldReaders(&na_error); if (na_res != NANOARROW_OK) { SetError(error, "[libpq] Failed to initialize field readers: %s", na_error.message); return na_res; @@ -953,6 +946,23 @@ AdbcStatusCode PostgresStatement::ExecuteQuery(struct ArrowArrayStream* stream, return ADBC_STATUS_OK; } +AdbcStatusCode PostgresStatement::ExecuteSchema(struct ArrowSchema* schema, + struct AdbcError* error) { + ClearResult(); + if (query_.empty()) { + SetError(error, "%s", "[libpq] Must SetSqlQuery before ExecuteQuery"); + return ADBC_STATUS_INVALID_STATE; + } else if (bind_.release) { + // TODO: if we have parameters, bind them (since they can affect the output schema) + SetError(error, "[libpq] ExecuteSchema with parameters is not implemented"); + return ADBC_STATUS_NOT_IMPLEMENTED; + } + + RAISE_ADBC(SetupReader(error)); + CHECK_NA(INTERNAL, reader_.copy_reader_->GetSchema(schema), error); + return ADBC_STATUS_OK; +} + AdbcStatusCode PostgresStatement::ExecuteUpdateBulk(int64_t* rows_affected, struct AdbcError* error) { if (!bind_.release) { @@ -964,12 +974,8 @@ AdbcStatusCode PostgresStatement::ExecuteUpdateBulk(int64_t* rows_affected, std::memset(&bind_, 0, sizeof(bind_)); RAISE_ADBC(bind_stream.Begin( [&]() -> AdbcStatusCode { - if (!ingest_.append) { - // CREATE TABLE - return CreateBulkTable(bind_stream.bind_schema.value, - bind_stream.bind_schema_fields, error); - } - return ADBC_STATUS_OK; + return CreateBulkTable(bind_stream.bind_schema.value, + bind_stream.bind_schema_fields, error); }, error)); RAISE_ADBC(bind_stream.SetParamTypes(*type_resolver_, error)); @@ -997,7 +1003,8 @@ AdbcStatusCode PostgresStatement::ExecuteUpdateQuery(int64_t* rows_affected, PQexecPrepared(connection_->conn(), /*stmtName=*/"", /*nParams=*/0, /*paramValues=*/nullptr, /*paramLengths=*/nullptr, /*paramFormats=*/nullptr, /*resultFormat=*/kPgBinaryFormat); - if (PQresultStatus(result) != PGRES_COMMAND_OK) { + ExecStatusType status = PQresultStatus(result); + if (status != PGRES_COMMAND_OK && status != PGRES_TUPLES_OK) { SetError(error, "[libpq] Failed to execute query: %s\nQuery was:%s", PQerrorMessage(connection_->conn()), query_.c_str()); PQclear(result); @@ -1008,6 +1015,64 @@ AdbcStatusCode PostgresStatement::ExecuteUpdateQuery(int64_t* rows_affected, return ADBC_STATUS_OK; } +AdbcStatusCode PostgresStatement::GetOption(const char* key, char* value, size_t* length, + struct AdbcError* error) { + std::string result; + if (std::strcmp(key, ADBC_INGEST_OPTION_TARGET_TABLE) == 0) { + result = ingest_.target; + } else if (std::strcmp(key, ADBC_INGEST_OPTION_MODE) == 0) { + switch (ingest_.mode) { + case IngestMode::kCreate: + result = ADBC_INGEST_OPTION_MODE_CREATE; + break; + case IngestMode::kAppend: + result = ADBC_INGEST_OPTION_MODE_APPEND; + break; + case IngestMode::kReplace: + result = ADBC_INGEST_OPTION_MODE_REPLACE; + break; + case IngestMode::kCreateAppend: + result = ADBC_INGEST_OPTION_MODE_CREATE_APPEND; + break; + } + } else if (std::strcmp(key, ADBC_POSTGRESQL_OPTION_BATCH_SIZE_HINT_BYTES) == 0) { + result = std::to_string(reader_.batch_size_hint_bytes_); + } else { + SetError(error, "[libq] Unknown statement option '%s'", key); + return ADBC_STATUS_NOT_FOUND; + } + + if (result.size() + 1 <= *length) { + std::memcpy(value, result.data(), result.size() + 1); + } + *length = static_cast(result.size() + 1); + return ADBC_STATUS_OK; +} + +AdbcStatusCode PostgresStatement::GetOptionBytes(const char* key, uint8_t* value, + size_t* length, + struct AdbcError* error) { + SetError(error, "[libq] Unknown statement option '%s'", key); + return ADBC_STATUS_NOT_FOUND; +} + +AdbcStatusCode PostgresStatement::GetOptionDouble(const char* key, double* value, + struct AdbcError* error) { + SetError(error, "[libq] Unknown statement option '%s'", key); + return ADBC_STATUS_NOT_FOUND; +} + +AdbcStatusCode PostgresStatement::GetOptionInt(const char* key, int64_t* value, + struct AdbcError* error) { + std::string result; + if (std::strcmp(key, ADBC_POSTGRESQL_OPTION_BATCH_SIZE_HINT_BYTES) == 0) { + *value = reader_.batch_size_hint_bytes_; + return ADBC_STATUS_OK; + } + SetError(error, "[libq] Unknown statement option '%s'", key); + return ADBC_STATUS_NOT_FOUND; +} + AdbcStatusCode PostgresStatement::GetParameterSchema(struct ArrowSchema* schema, struct AdbcError* error) { return ADBC_STATUS_NOT_IMPLEMENTED; @@ -1048,14 +1113,18 @@ AdbcStatusCode PostgresStatement::SetOption(const char* key, const char* value, ingest_.target = value; } else if (std::strcmp(key, ADBC_INGEST_OPTION_MODE) == 0) { if (std::strcmp(value, ADBC_INGEST_OPTION_MODE_CREATE) == 0) { - ingest_.append = false; + ingest_.mode = IngestMode::kCreate; } else if (std::strcmp(value, ADBC_INGEST_OPTION_MODE_APPEND) == 0) { - ingest_.append = true; + ingest_.mode = IngestMode::kAppend; + } else if (std::strcmp(value, ADBC_INGEST_OPTION_MODE_REPLACE) == 0) { + ingest_.mode = IngestMode::kReplace; + } else if (std::strcmp(value, ADBC_INGEST_OPTION_MODE_CREATE_APPEND) == 0) { + ingest_.mode = IngestMode::kCreateAppend; } else { SetError(error, "[libpq] Invalid value '%s' for option '%s'", value, key); return ADBC_STATUS_INVALID_ARGUMENT; } - } else if (std::strcmp(value, ADBC_POSTGRESQL_OPTION_BATCH_SIZE_HINT_BYTES)) { + } else if (std::strcmp(key, ADBC_POSTGRESQL_OPTION_BATCH_SIZE_HINT_BYTES) == 0) { int64_t int_value = std::atol(value); if (int_value <= 0) { SetError(error, "[libpq] Invalid value '%s' for option '%s'", value, key); @@ -1070,6 +1139,76 @@ AdbcStatusCode PostgresStatement::SetOption(const char* key, const char* value, return ADBC_STATUS_OK; } +AdbcStatusCode PostgresStatement::SetOptionBytes(const char* key, const uint8_t* value, + size_t length, struct AdbcError* error) { + SetError(error, "%s%s", "[libpq] Unknown option ", key); + return ADBC_STATUS_NOT_IMPLEMENTED; +} + +AdbcStatusCode PostgresStatement::SetOptionDouble(const char* key, double value, + struct AdbcError* error) { + SetError(error, "%s%s", "[libpq] Unknown option ", key); + return ADBC_STATUS_NOT_IMPLEMENTED; +} + +AdbcStatusCode PostgresStatement::SetOptionInt(const char* key, int64_t value, + struct AdbcError* error) { + if (std::strcmp(key, ADBC_POSTGRESQL_OPTION_BATCH_SIZE_HINT_BYTES) == 0) { + if (value <= 0) { + SetError(error, "[libpq] Invalid value '%" PRIi64 "' for option '%s'", value, key); + return ADBC_STATUS_INVALID_ARGUMENT; + } + + this->reader_.batch_size_hint_bytes_ = value; + return ADBC_STATUS_OK; + } + SetError(error, "%s%s", "[libpq] Unknown option ", key); + return ADBC_STATUS_NOT_IMPLEMENTED; +} + +AdbcStatusCode PostgresStatement::SetupReader(struct AdbcError* error) { + // TODO: we should pipeline here and assume this will succeed + PGresult* result = PQprepare(connection_->conn(), /*stmtName=*/"", query_.c_str(), + /*nParams=*/0, nullptr); + if (PQresultStatus(result) != PGRES_COMMAND_OK) { + SetError(error, + "[libpq] Failed to execute query: could not infer schema: failed to " + "prepare query: %s\nQuery was:%s", + PQerrorMessage(connection_->conn()), query_.c_str()); + PQclear(result); + return ADBC_STATUS_IO; + } + PQclear(result); + result = PQdescribePrepared(connection_->conn(), /*stmtName=*/""); + if (PQresultStatus(result) != PGRES_COMMAND_OK) { + SetError(error, + "[libpq] Failed to execute query: could not infer schema: failed to " + "describe prepared statement: %s\nQuery was:%s", + PQerrorMessage(connection_->conn()), query_.c_str()); + PQclear(result); + return ADBC_STATUS_IO; + } + + // Resolve the information from the PGresult into a PostgresType + PostgresType root_type; + AdbcStatusCode status = ResolvePostgresType(*type_resolver_, result, &root_type, error); + PQclear(result); + if (status != ADBC_STATUS_OK) return status; + + // Initialize the copy reader and infer the output schema (i.e., error for + // unsupported types before issuing the COPY query) + reader_.copy_reader_.reset(new PostgresCopyStreamReader()); + reader_.copy_reader_->Init(root_type); + struct ArrowError na_error; + int na_res = reader_.copy_reader_->InferOutputSchema(&na_error); + if (na_res != NANOARROW_OK) { + SetError(error, "[libpq] Failed to infer output schema: (%d) %s: %s", na_res, + std::strerror(na_res), na_error.message); + return ADBC_STATUS_INTERNAL; + } + return ADBC_STATUS_OK; +} + void PostgresStatement::ClearResult() { // TODO: we may want to synchronize here for safety reader_.Release(); diff --git a/c/driver/postgresql/statement.h b/c/driver/postgresql/statement.h index 62af2457d5..eca3997153 100644 --- a/c/driver/postgresql/statement.h +++ b/c/driver/postgresql/statement.h @@ -100,13 +100,25 @@ class PostgresStatement { AdbcStatusCode Bind(struct ArrowArray* values, struct ArrowSchema* schema, struct AdbcError* error); AdbcStatusCode Bind(struct ArrowArrayStream* stream, struct AdbcError* error); + AdbcStatusCode Cancel(struct AdbcError* error); AdbcStatusCode ExecuteQuery(struct ArrowArrayStream* stream, int64_t* rows_affected, struct AdbcError* error); + AdbcStatusCode ExecuteSchema(struct ArrowSchema* schema, struct AdbcError* error); + AdbcStatusCode GetOption(const char* key, char* value, size_t* length, + struct AdbcError* error); + AdbcStatusCode GetOptionBytes(const char* key, uint8_t* value, size_t* length, + struct AdbcError* error); + AdbcStatusCode GetOptionDouble(const char* key, double* value, struct AdbcError* error); + AdbcStatusCode GetOptionInt(const char* key, int64_t* value, struct AdbcError* error); AdbcStatusCode GetParameterSchema(struct ArrowSchema* schema, struct AdbcError* error); AdbcStatusCode New(struct AdbcConnection* connection, struct AdbcError* error); AdbcStatusCode Prepare(struct AdbcError* error); AdbcStatusCode Release(struct AdbcError* error); AdbcStatusCode SetOption(const char* key, const char* value, struct AdbcError* error); + AdbcStatusCode SetOptionBytes(const char* key, const uint8_t* value, size_t length, + struct AdbcError* error); + AdbcStatusCode SetOptionDouble(const char* key, double value, struct AdbcError* error); + AdbcStatusCode SetOptionInt(const char* key, int64_t value, struct AdbcError* error); AdbcStatusCode SetSqlQuery(const char* query, struct AdbcError* error); // --------------------------------------------------------------------- @@ -122,6 +134,7 @@ class PostgresStatement { AdbcStatusCode ExecutePreparedStatement(struct ArrowArrayStream* stream, int64_t* rows_affected, struct AdbcError* error); + AdbcStatusCode SetupReader(struct AdbcError* error); private: std::shared_ptr type_resolver_; @@ -133,9 +146,16 @@ class PostgresStatement { struct ArrowArrayStream bind_; // Bulk ingest state + enum class IngestMode { + kCreate, + kAppend, + kReplace, + kCreateAppend, + }; + struct { std::string target; - bool append = false; + IngestMode mode = IngestMode::kCreate; } ingest_; TupleReader reader_; diff --git a/c/driver/snowflake/snowflake_test.cc b/c/driver/snowflake/snowflake_test.cc index c8e08a568f..b3833d78c7 100644 --- a/c/driver/snowflake/snowflake_test.cc +++ b/c/driver/snowflake/snowflake_test.cc @@ -106,11 +106,11 @@ class SnowflakeQuirks : public adbc_validation::DriverQuirks { } std::string BindParameter(int index) const override { return "?"; } + bool supports_bulk_ingest(const char* /*mode*/) const override { return true; } bool supports_concurrent_statements() const override { return true; } bool supports_transactions() const override { return true; } bool supports_get_sql_info() const override { return false; } bool supports_get_objects() const override { return true; } - bool supports_bulk_ingest() const override { return true; } bool supports_partitioned_data() const override { return false; } bool supports_dynamic_parameter_binding() const override { return false; } bool ddl_implicit_commit_txn() const override { return true; } diff --git a/c/driver/sqlite/sqlite.c b/c/driver/sqlite/sqlite.c index 80da86746a..6d2f344562 100644 --- a/c/driver/sqlite/sqlite.c +++ b/c/driver/sqlite/sqlite.c @@ -86,6 +86,26 @@ AdbcStatusCode SqliteDatabaseSetOption(struct AdbcDatabase* database, const char return ADBC_STATUS_NOT_IMPLEMENTED; } +AdbcStatusCode SqliteDatabaseSetOptionBytes(struct AdbcDatabase* database, + const char* key, const uint8_t* value, + size_t length, struct AdbcError* error) { + CHECK_DB_INIT(database, error); + return ADBC_STATUS_NOT_IMPLEMENTED; +} + +AdbcStatusCode SqliteDatabaseSetOptionDouble(struct AdbcDatabase* database, + const char* key, double value, + struct AdbcError* error) { + CHECK_DB_INIT(database, error); + return ADBC_STATUS_NOT_IMPLEMENTED; +} + +AdbcStatusCode SqliteDatabaseSetOptionInt(struct AdbcDatabase* database, const char* key, + int64_t value, struct AdbcError* error) { + CHECK_DB_INIT(database, error); + return ADBC_STATUS_NOT_IMPLEMENTED; +} + int OpenDatabase(const char* maybe_uri, sqlite3** db, struct AdbcError* error) { const char* uri = maybe_uri ? maybe_uri : kDefaultUri; int rc = sqlite3_open_v2(uri, db, @@ -120,6 +140,33 @@ AdbcStatusCode ExecuteQuery(struct SqliteConnection* conn, const char* query, return ADBC_STATUS_OK; } +AdbcStatusCode SqliteDatabaseGetOption(struct AdbcDatabase* database, const char* key, + char* value, size_t* length, + struct AdbcError* error) { + CHECK_DB_INIT(database, error); + return ADBC_STATUS_NOT_FOUND; +} + +AdbcStatusCode SqliteDatabaseGetOptionBytes(struct AdbcDatabase* database, + const char* key, uint8_t* value, + size_t* length, struct AdbcError* error) { + CHECK_DB_INIT(database, error); + return ADBC_STATUS_NOT_FOUND; +} + +AdbcStatusCode SqliteDatabaseGetOptionDouble(struct AdbcDatabase* database, + const char* key, double* value, + struct AdbcError* error) { + CHECK_DB_INIT(database, error); + return ADBC_STATUS_NOT_FOUND; +} + +AdbcStatusCode SqliteDatabaseGetOptionInt(struct AdbcDatabase* database, const char* key, + int64_t* value, struct AdbcError* error) { + CHECK_DB_INIT(database, error); + return ADBC_STATUS_NOT_FOUND; +} + AdbcStatusCode SqliteDatabaseInit(struct AdbcDatabase* database, struct AdbcError* error) { CHECK_DB_INIT(database, error); @@ -204,6 +251,27 @@ AdbcStatusCode SqliteConnectionSetOption(struct AdbcConnection* connection, return ADBC_STATUS_NOT_IMPLEMENTED; } +AdbcStatusCode SqliteConnectionSetOptionBytes(struct AdbcConnection* connection, + const char* key, const uint8_t* value, + size_t length, struct AdbcError* error) { + CHECK_DB_INIT(connection, error); + return ADBC_STATUS_NOT_IMPLEMENTED; +} + +AdbcStatusCode SqliteConnectionSetOptionDouble(struct AdbcConnection* connection, + const char* key, double value, + struct AdbcError* error) { + CHECK_DB_INIT(connection, error); + return ADBC_STATUS_NOT_IMPLEMENTED; +} + +AdbcStatusCode SqliteConnectionSetOptionInt(struct AdbcConnection* connection, + const char* key, int64_t value, + struct AdbcError* error) { + CHECK_DB_INIT(connection, error); + return ADBC_STATUS_NOT_IMPLEMENTED; +} + AdbcStatusCode SqliteConnectionInit(struct AdbcConnection* connection, struct AdbcDatabase* database, struct AdbcError* error) { @@ -754,6 +822,34 @@ AdbcStatusCode SqliteConnectionGetObjects(struct AdbcConnection* connection, int return BatchToArrayStream(&array, &schema, out, error); } +AdbcStatusCode SqliteConnectionGetOption(struct AdbcConnection* connection, + const char* key, char* value, size_t* length, + struct AdbcError* error) { + CHECK_DB_INIT(connection, error); + return ADBC_STATUS_NOT_FOUND; +} + +AdbcStatusCode SqliteConnectionGetOptionBytes(struct AdbcConnection* connection, + const char* key, uint8_t* value, + size_t* length, struct AdbcError* error) { + CHECK_DB_INIT(connection, error); + return ADBC_STATUS_NOT_FOUND; +} + +AdbcStatusCode SqliteConnectionGetOptionDouble(struct AdbcConnection* connection, + const char* key, double* value, + struct AdbcError* error) { + CHECK_DB_INIT(connection, error); + return ADBC_STATUS_NOT_FOUND; +} + +AdbcStatusCode SqliteConnectionGetOptionInt(struct AdbcConnection* connection, + const char* key, int64_t* value, + struct AdbcError* error) { + CHECK_DB_INIT(connection, error); + return ADBC_STATUS_NOT_FOUND; +} + AdbcStatusCode SqliteConnectionGetTableSchema(struct AdbcConnection* connection, const char* catalog, const char* db_schema, const char* table_name, @@ -1243,6 +1339,34 @@ AdbcStatusCode SqliteStatementBindStream(struct AdbcStatement* statement, return AdbcSqliteBinderSetArrayStream(&stmt->binder, stream, error); } +AdbcStatusCode SqliteStatementGetOption(struct AdbcStatement* statement, const char* key, + char* value, size_t* length, + struct AdbcError* error) { + CHECK_DB_INIT(statement, error); + return ADBC_STATUS_NOT_FOUND; +} + +AdbcStatusCode SqliteStatementGetOptionBytes(struct AdbcStatement* statement, + const char* key, uint8_t* value, + size_t* length, struct AdbcError* error) { + CHECK_DB_INIT(statement, error); + return ADBC_STATUS_NOT_FOUND; +} + +AdbcStatusCode SqliteStatementGetOptionDouble(struct AdbcStatement* statement, + const char* key, double* value, + struct AdbcError* error) { + CHECK_DB_INIT(statement, error); + return ADBC_STATUS_NOT_FOUND; +} + +AdbcStatusCode SqliteStatementGetOptionInt(struct AdbcStatement* statement, + const char* key, int64_t* value, + struct AdbcError* error) { + CHECK_DB_INIT(statement, error); + return ADBC_STATUS_NOT_FOUND; +} + AdbcStatusCode SqliteStatementGetParameterSchema(struct AdbcStatement* statement, struct ArrowSchema* schema, struct AdbcError* error) { @@ -1332,6 +1456,27 @@ AdbcStatusCode SqliteStatementSetOption(struct AdbcStatement* statement, const c return ADBC_STATUS_NOT_IMPLEMENTED; } +AdbcStatusCode SqliteStatementSetOptionBytes(struct AdbcStatement* statement, + const char* key, const uint8_t* value, + size_t length, struct AdbcError* error) { + CHECK_DB_INIT(statement, error); + return ADBC_STATUS_NOT_IMPLEMENTED; +} + +AdbcStatusCode SqliteStatementSetOptionDouble(struct AdbcStatement* statement, + const char* key, double value, + struct AdbcError* error) { + CHECK_DB_INIT(statement, error); + return ADBC_STATUS_NOT_IMPLEMENTED; +} + +AdbcStatusCode SqliteStatementSetOptionInt(struct AdbcStatement* statement, + const char* key, int64_t value, + struct AdbcError* error) { + CHECK_DB_INIT(statement, error); + return ADBC_STATUS_NOT_IMPLEMENTED; +} + AdbcStatusCode SqliteStatementExecutePartitions(struct AdbcStatement* statement, struct ArrowSchema* schema, struct AdbcPartitions* partitions, @@ -1382,24 +1527,91 @@ AdbcStatusCode SqliteDriverInit(int version, void* raw_driver, struct AdbcError* // Public names -AdbcStatusCode AdbcDatabaseNew(struct AdbcDatabase* database, struct AdbcError* error) { - return SqliteDatabaseNew(database, error); +AdbcStatusCode AdbcDatabaseGetOption(struct AdbcDatabase* database, const char* key, + char* value, size_t* length, + struct AdbcError* error) { + return SqliteDatabaseGetOption(database, key, value, length, error); } -AdbcStatusCode AdbcDatabaseSetOption(struct AdbcDatabase* database, const char* key, - const char* value, struct AdbcError* error) { - return SqliteDatabaseSetOption(database, key, value, error); +AdbcStatusCode AdbcDatabaseGetOptionBytes(struct AdbcDatabase* database, const char* key, + uint8_t* value, size_t* length, + struct AdbcError* error) { + return SqliteDatabaseGetOptionBytes(database, key, value, length, error); +} + +AdbcStatusCode AdbcDatabaseGetOptionInt(struct AdbcDatabase* database, const char* key, + int64_t* value, struct AdbcError* error) { + return SqliteDatabaseGetOptionInt(database, key, value, error); +} + +AdbcStatusCode AdbcDatabaseGetOptionDouble(struct AdbcDatabase* database, const char* key, + double* value, struct AdbcError* error) { + return SqliteDatabaseGetOptionDouble(database, key, value, error); } AdbcStatusCode AdbcDatabaseInit(struct AdbcDatabase* database, struct AdbcError* error) { return SqliteDatabaseInit(database, error); } +AdbcStatusCode AdbcDatabaseNew(struct AdbcDatabase* database, struct AdbcError* error) { + return SqliteDatabaseNew(database, error); +} + AdbcStatusCode AdbcDatabaseRelease(struct AdbcDatabase* database, struct AdbcError* error) { return SqliteDatabaseRelease(database, error); } +AdbcStatusCode AdbcDatabaseSetOption(struct AdbcDatabase* database, const char* key, + const char* value, struct AdbcError* error) { + return SqliteDatabaseSetOption(database, key, value, error); +} + +AdbcStatusCode AdbcDatabaseSetOptionBytes(struct AdbcDatabase* database, const char* key, + const uint8_t* value, size_t length, + struct AdbcError* error) { + return SqliteDatabaseSetOptionBytes(database, key, value, length, error); +} + +AdbcStatusCode AdbcDatabaseSetOptionInt(struct AdbcDatabase* database, const char* key, + int64_t value, struct AdbcError* error) { + return SqliteDatabaseSetOptionInt(database, key, value, error); +} + +AdbcStatusCode AdbcDatabaseSetOptionDouble(struct AdbcDatabase* database, const char* key, + double value, struct AdbcError* error) { + return SqliteDatabaseSetOptionDouble(database, key, value, error); +} + +AdbcStatusCode AdbcConnectionCancel(struct AdbcConnection* connection, + struct AdbcError* error) { + return ADBC_STATUS_NOT_IMPLEMENTED; +} + +AdbcStatusCode AdbcConnectionGetOption(struct AdbcConnection* connection, const char* key, + char* value, size_t* length, + struct AdbcError* error) { + return SqliteConnectionGetOption(connection, key, value, length, error); +} + +AdbcStatusCode AdbcConnectionGetOptionBytes(struct AdbcConnection* connection, + const char* key, uint8_t* value, + size_t* length, struct AdbcError* error) { + return SqliteConnectionGetOptionBytes(connection, key, value, length, error); +} + +AdbcStatusCode AdbcConnectionGetOptionInt(struct AdbcConnection* connection, + const char* key, int64_t* value, + struct AdbcError* error) { + return SqliteConnectionGetOptionInt(connection, key, value, error); +} + +AdbcStatusCode AdbcConnectionGetOptionDouble(struct AdbcConnection* connection, + const char* key, double* value, + struct AdbcError* error) { + return SqliteConnectionGetOptionDouble(connection, key, value, error); +} + AdbcStatusCode AdbcConnectionNew(struct AdbcConnection* connection, struct AdbcError* error) { return SqliteConnectionNew(connection, error); @@ -1410,6 +1622,24 @@ AdbcStatusCode AdbcConnectionSetOption(struct AdbcConnection* connection, const return SqliteConnectionSetOption(connection, key, value, error); } +AdbcStatusCode AdbcConnectionSetOptionBytes(struct AdbcConnection* connection, + const char* key, const uint8_t* value, + size_t length, struct AdbcError* error) { + return SqliteConnectionSetOptionBytes(connection, key, value, length, error); +} + +AdbcStatusCode AdbcConnectionSetOptionInt(struct AdbcConnection* connection, + const char* key, int64_t value, + struct AdbcError* error) { + return SqliteConnectionSetOptionInt(connection, key, value, error); +} + +AdbcStatusCode AdbcConnectionSetOptionDouble(struct AdbcConnection* connection, + const char* key, double value, + struct AdbcError* error) { + return SqliteConnectionSetOptionDouble(connection, key, value, error); +} + AdbcStatusCode AdbcConnectionInit(struct AdbcConnection* connection, struct AdbcDatabase* database, struct AdbcError* error) { @@ -1472,6 +1702,11 @@ AdbcStatusCode AdbcConnectionRollback(struct AdbcConnection* connection, return SqliteConnectionRollback(connection, error); } +AdbcStatusCode AdbcStatementCancel(struct AdbcStatement* statement, + struct AdbcError* error) { + return ADBC_STATUS_NOT_IMPLEMENTED; +} + AdbcStatusCode AdbcStatementNew(struct AdbcConnection* connection, struct AdbcStatement* statement, struct AdbcError* error) { @@ -1490,6 +1725,12 @@ AdbcStatusCode AdbcStatementExecuteQuery(struct AdbcStatement* statement, return SqliteStatementExecuteQuery(statement, out, rows_affected, error); } +AdbcStatusCode AdbcStatementExecuteSchema(struct AdbcStatement* statement, + struct ArrowSchema* schema, + struct AdbcError* error) { + return ADBC_STATUS_NOT_IMPLEMENTED; +} + AdbcStatusCode AdbcStatementPrepare(struct AdbcStatement* statement, struct AdbcError* error) { return SqliteStatementPrepare(statement, error); @@ -1518,6 +1759,29 @@ AdbcStatusCode AdbcStatementBindStream(struct AdbcStatement* statement, return SqliteStatementBindStream(statement, stream, error); } +AdbcStatusCode AdbcStatementGetOption(struct AdbcStatement* statement, const char* key, + char* value, size_t* length, + struct AdbcError* error) { + return SqliteStatementGetOption(statement, key, value, length, error); +} + +AdbcStatusCode AdbcStatementGetOptionBytes(struct AdbcStatement* statement, + const char* key, uint8_t* value, + size_t* length, struct AdbcError* error) { + return SqliteStatementGetOptionBytes(statement, key, value, length, error); +} + +AdbcStatusCode AdbcStatementGetOptionInt(struct AdbcStatement* statement, const char* key, + int64_t* value, struct AdbcError* error) { + return SqliteStatementGetOptionInt(statement, key, value, error); +} + +AdbcStatusCode AdbcStatementGetOptionDouble(struct AdbcStatement* statement, + const char* key, double* value, + struct AdbcError* error) { + return SqliteStatementGetOptionDouble(statement, key, value, error); +} + AdbcStatusCode AdbcStatementGetParameterSchema(struct AdbcStatement* statement, struct ArrowSchema* schema, struct AdbcError* error) { @@ -1529,6 +1793,23 @@ AdbcStatusCode AdbcStatementSetOption(struct AdbcStatement* statement, const cha return SqliteStatementSetOption(statement, key, value, error); } +AdbcStatusCode AdbcStatementSetOptionBytes(struct AdbcStatement* statement, + const char* key, const uint8_t* value, + size_t length, struct AdbcError* error) { + return SqliteStatementSetOptionBytes(statement, key, value, length, error); +} + +AdbcStatusCode AdbcStatementSetOptionInt(struct AdbcStatement* statement, const char* key, + int64_t value, struct AdbcError* error) { + return SqliteStatementSetOptionInt(statement, key, value, error); +} + +AdbcStatusCode AdbcStatementSetOptionDouble(struct AdbcStatement* statement, + const char* key, double value, + struct AdbcError* error) { + return SqliteStatementSetOptionDouble(statement, key, value, error); +} + AdbcStatusCode AdbcStatementExecutePartitions(struct AdbcStatement* statement, struct ArrowSchema* schema, struct AdbcPartitions* partitions, diff --git a/c/driver/sqlite/sqlite_test.cc b/c/driver/sqlite/sqlite_test.cc index 3ab21fe3fa..13ab64aa49 100644 --- a/c/driver/sqlite/sqlite_test.cc +++ b/c/driver/sqlite/sqlite_test.cc @@ -91,7 +91,27 @@ class SqliteQuirks : public adbc_validation::DriverQuirks { return ddl; } + bool supports_bulk_ingest(const char* mode) const override { + return std::strcmp(mode, ADBC_INGEST_OPTION_MODE_APPEND) == 0 || + std::strcmp(mode, ADBC_INGEST_OPTION_MODE_CREATE) == 0; + } bool supports_concurrent_statements() const override { return true; } + bool supports_get_option() const override { return false; } + std::optional supports_get_sql_info( + uint32_t info_code) const override { + switch (info_code) { + case ADBC_INFO_DRIVER_NAME: + return "ADBC SQLite Driver"; + case ADBC_INFO_DRIVER_VERSION: + return "(unknown)"; + case ADBC_INFO_VENDOR_NAME: + return "SQLite"; + case ADBC_INFO_VENDOR_VERSION: + return "3."; + default: + return std::nullopt; + } + } std::string catalog() const override { return "main"; } std::string db_schema() const override { return ""; } diff --git a/c/integration/duckdb/duckdb_test.cc b/c/integration/duckdb/duckdb_test.cc index fd6e1984e6..26f1efe4bd 100644 --- a/c/integration/duckdb/duckdb_test.cc +++ b/c/integration/duckdb/duckdb_test.cc @@ -46,7 +46,7 @@ class DuckDbQuirks : public adbc_validation::DriverQuirks { std::string BindParameter(int index) const override { return "?"; } - bool supports_bulk_ingest() const override { return false; } + bool supports_bulk_ingest(const char* /*mode*/) const override { return false; } bool supports_concurrent_statements() const override { return true; } bool supports_dynamic_parameter_binding() const override { return false; } bool supports_get_sql_info() const override { return false; } diff --git a/c/validation/adbc_validation.cc b/c/validation/adbc_validation.cc index 094e53cf2c..e0daeca9aa 100644 --- a/c/validation/adbc_validation.cc +++ b/c/validation/adbc_validation.cc @@ -34,6 +34,7 @@ #include #include #include +#include #include "adbc_validation_util.h" @@ -102,7 +103,7 @@ AdbcStatusCode DriverQuirks::EnsureSampleTable(struct AdbcConnection* connection AdbcStatusCode DriverQuirks::CreateSampleTable(struct AdbcConnection* connection, const std::string& name, struct AdbcError* error) const { - if (!supports_bulk_ingest()) { + if (!supports_bulk_ingest(ADBC_INGEST_OPTION_MODE_CREATE)) { return ADBC_STATUS_NOT_IMPLEMENTED; } return DoIngestSampleTable(connection, name, error); @@ -248,6 +249,56 @@ void ConnectionTest::TestAutocommitToggle() { //------------------------------------------------------------ // Tests of metadata +std::optional ConnectionGetOption(struct AdbcConnection* connection, + std::string_view option, + struct AdbcError* error) { + char buffer[128]; + size_t buffer_size = sizeof(buffer); + AdbcStatusCode status = + AdbcConnectionGetOption(connection, option.data(), buffer, &buffer_size, error); + EXPECT_THAT(status, IsOkStatus(error)); + if (status != ADBC_STATUS_OK) return std::nullopt; + EXPECT_GT(buffer_size, 0); + if (buffer_size == 0) return std::nullopt; + return std::string(buffer, buffer_size - 1); +} + +void ConnectionTest::TestMetadataCurrentCatalog() { + ASSERT_THAT(AdbcConnectionNew(&connection, &error), IsOkStatus(&error)); + ASSERT_THAT(AdbcConnectionInit(&connection, &database, &error), IsOkStatus(&error)); + + if (quirks()->supports_metadata_current_catalog()) { + ASSERT_THAT( + ConnectionGetOption(&connection, ADBC_CONNECTION_OPTION_CURRENT_CATALOG, &error), + ::testing::Optional(quirks()->catalog())); + } else { + char buffer[128]; + size_t buffer_size = sizeof(buffer); + ASSERT_THAT( + AdbcConnectionGetOption(&connection, ADBC_CONNECTION_OPTION_CURRENT_CATALOG, + buffer, &buffer_size, &error), + IsStatus(ADBC_STATUS_NOT_FOUND)); + } +} + +void ConnectionTest::TestMetadataCurrentDbSchema() { + ASSERT_THAT(AdbcConnectionNew(&connection, &error), IsOkStatus(&error)); + ASSERT_THAT(AdbcConnectionInit(&connection, &database, &error), IsOkStatus(&error)); + + if (quirks()->supports_metadata_current_db_schema()) { + ASSERT_THAT(ConnectionGetOption(&connection, ADBC_CONNECTION_OPTION_CURRENT_DB_SCHEMA, + &error), + ::testing::Optional(quirks()->db_schema())); + } else { + char buffer[128]; + size_t buffer_size = sizeof(buffer); + ASSERT_THAT( + AdbcConnectionGetOption(&connection, ADBC_CONNECTION_OPTION_CURRENT_DB_SCHEMA, + buffer, &buffer_size, &error), + IsStatus(ADBC_STATUS_NOT_FOUND)); + } +} + void ConnectionTest::TestMetadataGetInfo() { ASSERT_THAT(AdbcConnectionNew(&connection, &error), IsOkStatus(&error)); ASSERT_THAT(AdbcConnectionInit(&connection, &database, &error), IsOkStatus(&error)); @@ -263,6 +314,11 @@ void ConnectionTest::TestMetadataGetInfo() { ADBC_INFO_VENDOR_NAME, ADBC_INFO_VENDOR_VERSION, }) { + SCOPED_TRACE("info_code = " + std::to_string(info_code)); + std::optional expected = quirks()->supports_get_sql_info(info_code); + + if (!expected.has_value()) continue; + uint32_t info[] = {info_code}; StreamReader reader; @@ -317,7 +373,6 @@ void ConnectionTest::TestMetadataGetInfo() { reader.array_view->children[0]->buffer_views[1].data.as_uint32[row]; seen.push_back(code); - std::optional expected = quirks()->supports_get_sql_info(code); ASSERT_TRUE(expected.has_value()) << "Got unexpected info code " << code; uint8_t type_code = @@ -329,16 +384,16 @@ void ConnectionTest::TestMetadataGetInfo() { using T = std::decay_t; if constexpr (std::is_same_v) { ASSERT_EQ(uint8_t(2), type_code); - ASSERT_EQ(expected_value, + EXPECT_EQ(expected_value, ArrowArrayViewGetIntUnsafe( reader.array_view->children[1]->children[2], offset)); } else if constexpr (std::is_same_v) { ASSERT_EQ(uint8_t(0), type_code); struct ArrowStringView view = ArrowArrayViewGetStringUnsafe( reader.array_view->children[1]->children[0], offset); - ASSERT_EQ(expected_value, - std::string_view(static_cast(view.data), - view.size_bytes)); + EXPECT_THAT(std::string_view(static_cast(view.data), + view.size_bytes), + ::testing::HasSubstr(expected_value)); } else { static_assert(!sizeof(T), "not yet implemented"); } @@ -347,12 +402,12 @@ void ConnectionTest::TestMetadataGetInfo() { << "code: " << type_code; } } - ASSERT_THAT(seen, ::testing::IsSupersetOf(info)); + EXPECT_THAT(seen, ::testing::IsSupersetOf(info)); } } void ConnectionTest::TestMetadataGetTableSchema() { - if (!quirks()->supports_bulk_ingest()) { + if (!quirks()->supports_bulk_ingest(ADBC_INGEST_OPTION_MODE_CREATE)) { GTEST_SKIP(); } ASSERT_THAT(AdbcConnectionNew(&connection, &error), IsOkStatus(&error)); @@ -932,6 +987,33 @@ void ConnectionTest::TestMetadataGetObjectsPrimaryKey() { ASSERT_EQ(constraint_column_name, "id"); } +void ConnectionTest::TestMetadataGetObjectsCancel() { + if (!quirks()->supports_cancel() || !quirks()->supports_get_objects()) { + GTEST_SKIP(); + } + + ASSERT_THAT(AdbcConnectionNew(&connection, &error), IsOkStatus(&error)); + ASSERT_THAT(AdbcConnectionInit(&connection, &database, &error), IsOkStatus(&error)); + + StreamReader reader; + ASSERT_THAT( + AdbcConnectionGetObjects(&connection, ADBC_OBJECT_DEPTH_CATALOGS, nullptr, nullptr, + nullptr, nullptr, nullptr, &reader.stream.value, &error), + IsOkStatus(&error)); + ASSERT_NO_FATAL_FAILURE(reader.GetSchema()); + + ASSERT_THAT(AdbcConnectionCancel(&connection, &error), IsOkStatus(&error)); + + while (true) { + int err = reader.MaybeNext(); + if (err != 0) { + ASSERT_THAT(err, ::testing::AnyOf(0, IsErrno(ECANCELED, &reader.stream.value, + /*ArrowError*/ nullptr))); + } + if (!reader.array->release) break; + } +} + //------------------------------------------------------------ // Tests of AdbcStatement @@ -986,7 +1068,7 @@ void StatementTest::TestRelease() { template void StatementTest::TestSqlIngestType(ArrowType type, const std::vector>& values) { - if (!quirks()->supports_bulk_ingest()) { + if (!quirks()->supports_bulk_ingest(ADBC_INGEST_OPTION_MODE_CREATE)) { GTEST_SKIP(); } @@ -1118,7 +1200,7 @@ void StatementTest::TestSqlIngestBinary() { template void StatementTest::TestSqlIngestTemporalType(const char* timezone) { - if (!quirks()->supports_bulk_ingest()) { + if (!quirks()->supports_bulk_ingest(ADBC_INGEST_OPTION_MODE_CREATE)) { GTEST_SKIP(); } @@ -1240,7 +1322,8 @@ void StatementTest::TestSqlIngestTableEscaping() { } void StatementTest::TestSqlIngestAppend() { - if (!quirks()->supports_bulk_ingest()) { + if (!quirks()->supports_bulk_ingest(ADBC_INGEST_OPTION_MODE_CREATE) || + !quirks()->supports_bulk_ingest(ADBC_INGEST_OPTION_MODE_APPEND)) { GTEST_SKIP(); } @@ -1318,8 +1401,185 @@ void StatementTest::TestSqlIngestAppend() { ASSERT_THAT(AdbcStatementRelease(&statement, &error), IsOkStatus(&error)); } +void StatementTest::TestSqlIngestReplace() { + if (!quirks()->supports_bulk_ingest(ADBC_INGEST_OPTION_MODE_REPLACE)) { + GTEST_SKIP(); + } + + // Ingest + + Handle schema; + Handle array; + struct ArrowError na_error; + ASSERT_THAT(MakeSchema(&schema.value, {{"int64s", NANOARROW_TYPE_INT64}}), IsOkErrno()); + ASSERT_THAT(MakeBatch(&schema.value, &array.value, &na_error, {42}), + IsOkErrno()); + + ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error)); + ASSERT_THAT(AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_TARGET_TABLE, + "bulk_ingest", &error), + IsOkStatus(&error)); + ASSERT_THAT(AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_MODE, + ADBC_INGEST_OPTION_MODE_REPLACE, &error), + IsOkStatus(&error)); + ASSERT_THAT(AdbcStatementBind(&statement, &array.value, &schema.value, &error), + IsOkStatus(&error)); + + int64_t rows_affected = 0; + ASSERT_THAT(AdbcStatementExecuteQuery(&statement, nullptr, &rows_affected, &error), + IsOkStatus(&error)); + ASSERT_THAT(rows_affected, ::testing::AnyOf(::testing::Eq(1), ::testing::Eq(-1))); + + // Read data back + ASSERT_THAT(AdbcStatementSetSqlQuery(&statement, "SELECT * FROM bulk_ingest", &error), + IsOkStatus(&error)); + { + StreamReader reader; + ASSERT_THAT(AdbcStatementExecuteQuery(&statement, &reader.stream.value, + &reader.rows_affected, &error), + IsOkStatus(&error)); + ASSERT_THAT(reader.rows_affected, + ::testing::AnyOf(::testing::Eq(1), ::testing::Eq(-1))); + + ASSERT_NO_FATAL_FAILURE(reader.GetSchema()); + ASSERT_NO_FATAL_FAILURE(CompareSchema(&reader.schema.value, + {{"int64s", NANOARROW_TYPE_INT64, NULLABLE}})); + + ASSERT_NO_FATAL_FAILURE(reader.Next()); + ASSERT_NE(nullptr, reader.array->release); + ASSERT_EQ(1, reader.array->length); + ASSERT_EQ(1, reader.array->n_children); + + ASSERT_NO_FATAL_FAILURE(CompareArray(reader.array_view->children[0], {42})); + + ASSERT_NO_FATAL_FAILURE(reader.Next()); + ASSERT_EQ(nullptr, reader.array->release); + } + + // Replace + // Re-initialize since Bind() should take ownership of data + ASSERT_THAT(MakeSchema(&schema.value, {{"int64s", NANOARROW_TYPE_INT64}}), IsOkErrno()); + ASSERT_THAT(MakeBatch(&schema.value, &array.value, &na_error, {-42, -42}), + IsOkErrno()); + + ASSERT_THAT(AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_TARGET_TABLE, + "bulk_ingest", &error), + IsOkStatus(&error)); + ASSERT_THAT(AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_MODE, + ADBC_INGEST_OPTION_MODE_REPLACE, &error), + IsOkStatus(&error)); + ASSERT_THAT(AdbcStatementBind(&statement, &array.value, &schema.value, &error), + IsOkStatus(&error)); + + ASSERT_THAT(AdbcStatementExecuteQuery(&statement, nullptr, &rows_affected, &error), + IsOkStatus(&error)); + ASSERT_THAT(rows_affected, ::testing::AnyOf(::testing::Eq(2), ::testing::Eq(-1))); + + // Read data back + ASSERT_THAT(AdbcStatementSetSqlQuery(&statement, "SELECT * FROM bulk_ingest", &error), + IsOkStatus(&error)); + { + StreamReader reader; + ASSERT_THAT(AdbcStatementExecuteQuery(&statement, &reader.stream.value, + &reader.rows_affected, &error), + IsOkStatus(&error)); + ASSERT_THAT(reader.rows_affected, + ::testing::AnyOf(::testing::Eq(2), ::testing::Eq(-1))); + + ASSERT_NO_FATAL_FAILURE(reader.GetSchema()); + ASSERT_NO_FATAL_FAILURE(CompareSchema(&reader.schema.value, + {{"int64s", NANOARROW_TYPE_INT64, NULLABLE}})); + + ASSERT_NO_FATAL_FAILURE(reader.Next()); + ASSERT_NE(nullptr, reader.array->release); + ASSERT_EQ(2, reader.array->length); + ASSERT_EQ(1, reader.array->n_children); + + ASSERT_NO_FATAL_FAILURE( + CompareArray(reader.array_view->children[0], {-42, -42})); + + ASSERT_NO_FATAL_FAILURE(reader.Next()); + ASSERT_EQ(nullptr, reader.array->release); + } +} + +void StatementTest::TestSqlIngestCreateAppend() { + if (!quirks()->supports_bulk_ingest(ADBC_INGEST_OPTION_MODE_CREATE_APPEND)) { + GTEST_SKIP(); + } + + ASSERT_THAT(quirks()->DropTable(&connection, "bulk_ingest", &error), + IsOkStatus(&error)); + + // Ingest + + Handle schema; + Handle array; + struct ArrowError na_error; + ASSERT_THAT(MakeSchema(&schema.value, {{"int64s", NANOARROW_TYPE_INT64}}), IsOkErrno()); + ASSERT_THAT(MakeBatch(&schema.value, &array.value, &na_error, {42}), + IsOkErrno()); + + ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error)); + ASSERT_THAT(AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_TARGET_TABLE, + "bulk_ingest", &error), + IsOkStatus(&error)); + ASSERT_THAT(AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_MODE, + ADBC_INGEST_OPTION_MODE_CREATE_APPEND, &error), + IsOkStatus(&error)); + ASSERT_THAT(AdbcStatementBind(&statement, &array.value, &schema.value, &error), + IsOkStatus(&error)); + + int64_t rows_affected = 0; + ASSERT_THAT(AdbcStatementExecuteQuery(&statement, nullptr, &rows_affected, &error), + IsOkStatus(&error)); + ASSERT_THAT(rows_affected, ::testing::AnyOf(::testing::Eq(1), ::testing::Eq(-1))); + + // Append + // Re-initialize since Bind() should take ownership of data + ASSERT_THAT(MakeSchema(&schema.value, {{"int64s", NANOARROW_TYPE_INT64}}), IsOkErrno()); + ASSERT_THAT(MakeBatch(&schema.value, &array.value, &na_error, {42, 42}), + IsOkErrno()); + + ASSERT_THAT(AdbcStatementBind(&statement, &array.value, &schema.value, &error), + IsOkStatus(&error)); + + ASSERT_THAT(AdbcStatementExecuteQuery(&statement, nullptr, &rows_affected, &error), + IsOkStatus(&error)); + ASSERT_THAT(rows_affected, ::testing::AnyOf(::testing::Eq(2), ::testing::Eq(-1))); + + // Read data back + ASSERT_THAT(AdbcStatementSetSqlQuery(&statement, "SELECT * FROM bulk_ingest", &error), + IsOkStatus(&error)); + { + StreamReader reader; + ASSERT_THAT(AdbcStatementExecuteQuery(&statement, &reader.stream.value, + &reader.rows_affected, &error), + IsOkStatus(&error)); + ASSERT_THAT(reader.rows_affected, + ::testing::AnyOf(::testing::Eq(3), ::testing::Eq(-1))); + + ASSERT_NO_FATAL_FAILURE(reader.GetSchema()); + ASSERT_NO_FATAL_FAILURE(CompareSchema(&reader.schema.value, + {{"int64s", NANOARROW_TYPE_INT64, NULLABLE}})); + + ASSERT_NO_FATAL_FAILURE(reader.Next()); + ASSERT_NE(nullptr, reader.array->release); + ASSERT_EQ(3, reader.array->length); + ASSERT_EQ(1, reader.array->n_children); + + ASSERT_NO_FATAL_FAILURE( + CompareArray(reader.array_view->children[0], {42, 42, 42})); + + ASSERT_NO_FATAL_FAILURE(reader.Next()); + ASSERT_EQ(nullptr, reader.array->release); + } + + ASSERT_THAT(AdbcStatementRelease(&statement, &error), IsOkStatus(&error)); +} + void StatementTest::TestSqlIngestErrors() { - if (!quirks()->supports_bulk_ingest()) { + if (!quirks()->supports_bulk_ingest(ADBC_INGEST_OPTION_MODE_CREATE)) { GTEST_SKIP(); } @@ -1399,7 +1659,7 @@ void StatementTest::TestSqlIngestErrors() { } void StatementTest::TestSqlIngestMultipleConnections() { - if (!quirks()->supports_bulk_ingest()) { + if (!quirks()->supports_bulk_ingest(ADBC_INGEST_OPTION_MODE_CREATE)) { GTEST_SKIP(); } @@ -1469,7 +1729,7 @@ void StatementTest::TestSqlIngestMultipleConnections() { } void StatementTest::TestSqlIngestSample() { - if (!quirks()->supports_bulk_ingest()) { + if (!quirks()->supports_bulk_ingest(ADBC_INGEST_OPTION_MODE_CREATE)) { GTEST_SKIP(); } @@ -1715,7 +1975,7 @@ void StatementTest::TestSqlPrepareSelectParams() { } void StatementTest::TestSqlPrepareUpdate() { - if (!quirks()->supports_bulk_ingest() || + if (!quirks()->supports_bulk_ingest(ADBC_INGEST_OPTION_MODE_CREATE) || !quirks()->supports_dynamic_parameter_binding()) { GTEST_SKIP(); } @@ -1794,7 +2054,7 @@ void StatementTest::TestSqlPrepareUpdateNoParams() { } void StatementTest::TestSqlPrepareUpdateStream() { - if (!quirks()->supports_bulk_ingest() || + if (!quirks()->supports_bulk_ingest(ADBC_INGEST_OPTION_MODE_CREATE) || !quirks()->supports_dynamic_parameter_binding()) { GTEST_SKIP(); } @@ -2069,6 +2329,36 @@ void StatementTest::TestSqlQueryStrings() { ASSERT_THAT(AdbcStatementRelease(&statement, &error), IsOkStatus(&error)); } +void StatementTest::TestSqlQueryCancel() { + if (!quirks()->supports_cancel()) { + GTEST_SKIP(); + } + + ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error)); + ASSERT_THAT(AdbcStatementSetSqlQuery(&statement, "SELECT 'SaShiSuSeSo'", &error), + IsOkStatus(&error)); + + { + StreamReader reader; + ASSERT_THAT(AdbcStatementExecuteQuery(&statement, &reader.stream.value, + &reader.rows_affected, &error), + IsOkStatus(&error)); + ASSERT_NO_FATAL_FAILURE(reader.GetSchema()); + + ASSERT_THAT(AdbcStatementCancel(&statement, &error), IsOkStatus(&error)); + while (true) { + int err = reader.MaybeNext(); + if (err != 0) { + ASSERT_THAT(err, ::testing::AnyOf(0, IsErrno(ECANCELED, &reader.stream.value, + /*ArrowError*/ nullptr))); + } + if (!reader.array->release) break; + } + } + + ASSERT_THAT(AdbcStatementRelease(&statement, &error), IsOkStatus(&error)); +} + void StatementTest::TestSqlQueryErrors() { // Invalid query ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error)); @@ -2088,6 +2378,13 @@ void StatementTest::TestTransactions() { ASSERT_THAT(quirks()->DropTable(&connection, "bulk_ingest", &error), IsOkStatus(&error)); + if (quirks()->supports_get_option()) { + auto autocommit = + ConnectionGetOption(&connection, ADBC_CONNECTION_OPTION_AUTOCOMMIT, &error); + ASSERT_THAT(autocommit, + ::testing::Optional(::testing::StrEq(ADBC_OPTION_VALUE_ENABLED))); + } + Handle connection2; ASSERT_THAT(AdbcConnectionNew(&connection2.value, &error), IsOkStatus(&error)); ASSERT_THAT(AdbcConnectionInit(&connection2.value, &database, &error), @@ -2097,6 +2394,13 @@ void StatementTest::TestTransactions() { ADBC_OPTION_VALUE_DISABLED, &error), IsOkStatus(&error)); + if (quirks()->supports_get_option()) { + auto autocommit = + ConnectionGetOption(&connection, ADBC_CONNECTION_OPTION_AUTOCOMMIT, &error); + ASSERT_THAT(autocommit, + ::testing::Optional(::testing::StrEq(ADBC_OPTION_VALUE_DISABLED))); + } + // Uncommitted change ASSERT_NO_FATAL_FAILURE(IngestSampleTable(&connection, &error)); @@ -2172,6 +2476,86 @@ void StatementTest::TestTransactions() { } } +void StatementTest::TestSqlSchemaInts() { + if (!quirks()->supports_execute_schema()) { + GTEST_SKIP() << "Not supported"; + } + + ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error)); + ASSERT_THAT(AdbcStatementSetSqlQuery(&statement, "SELECT 42", &error), + IsOkStatus(&error)); + + nanoarrow::UniqueSchema schema; + ASSERT_THAT(AdbcStatementExecuteSchema(&statement, schema.get(), &error), + IsOkStatus(&error)); + + ASSERT_EQ(1, schema->n_children); + ASSERT_THAT(schema->children[0]->format, ::testing::AnyOfArray({ + ::testing::StrEq("i"), // int32 + ::testing::StrEq("l"), // int64 + })); + + ASSERT_THAT(AdbcStatementRelease(&statement, &error), IsOkStatus(&error)); +} + +void StatementTest::TestSqlSchemaFloats() { + if (!quirks()->supports_execute_schema()) { + GTEST_SKIP() << "Not supported"; + } + + ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error)); + ASSERT_THAT(AdbcStatementSetSqlQuery(&statement, "SELECT CAST(1.5 AS FLOAT)", &error), + IsOkStatus(&error)); + + nanoarrow::UniqueSchema schema; + ASSERT_THAT(AdbcStatementExecuteSchema(&statement, schema.get(), &error), + IsOkStatus(&error)); + + ASSERT_EQ(1, schema->n_children); + ASSERT_THAT(schema->children[0]->format, ::testing::AnyOfArray({ + ::testing::StrEq("f"), // float32 + ::testing::StrEq("g"), // float64 + })); + + ASSERT_THAT(AdbcStatementRelease(&statement, &error), IsOkStatus(&error)); +} + +void StatementTest::TestSqlSchemaStrings() { + if (!quirks()->supports_execute_schema()) { + GTEST_SKIP() << "Not supported"; + } + + ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error)); + ASSERT_THAT(AdbcStatementSetSqlQuery(&statement, "SELECT 'hi'", &error), + IsOkStatus(&error)); + + nanoarrow::UniqueSchema schema; + ASSERT_THAT(AdbcStatementExecuteSchema(&statement, schema.get(), &error), + IsOkStatus(&error)); + + ASSERT_EQ(1, schema->n_children); + ASSERT_THAT(schema->children[0]->format, ::testing::AnyOfArray({ + ::testing::StrEq("u"), // string + ::testing::StrEq("U"), // large_string + })); + + ASSERT_THAT(AdbcStatementRelease(&statement, &error), IsOkStatus(&error)); +} + +void StatementTest::TestSqlSchemaErrors() { + if (!quirks()->supports_execute_schema()) { + GTEST_SKIP() << "Not supported"; + } + + ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error)); + + nanoarrow::UniqueSchema schema; + ASSERT_THAT(AdbcStatementExecuteSchema(&statement, schema.get(), &error), + IsStatus(ADBC_STATUS_INVALID_STATE, &error)); + + ASSERT_THAT(AdbcStatementRelease(&statement, &error), IsOkStatus(&error)); +} + void StatementTest::TestConcurrentStatements() { Handle statement1; Handle statement2; diff --git a/c/validation/adbc_validation.h b/c/validation/adbc_validation.h index 54622ba5ca..5e560d45f5 100644 --- a/c/validation/adbc_validation.h +++ b/c/validation/adbc_validation.h @@ -88,10 +88,22 @@ class DriverQuirks { return ingest_type; } + /// \brief Whether bulk ingest is supported + virtual bool supports_bulk_ingest(const char* mode) const { return true; } + + /// \brief Whether we can cancel queries. + virtual bool supports_cancel() const { return false; } + /// \brief Whether two statements can be used at the same time on a /// single connection virtual bool supports_concurrent_statements() const { return false; } + /// \brief Whether AdbcStatementExecuteSchema should work + virtual bool supports_execute_schema() const { return false; } + + /// \brief Whether GetOption* should work + virtual bool supports_get_option() const { return true; } + /// \brief Whether AdbcStatementExecutePartitions should work virtual bool supports_partitioned_data() const { return false; } @@ -112,8 +124,11 @@ class DriverQuirks { /// \brief Whether GetObjects is implemented virtual bool supports_get_objects() const { return true; } - /// \brief Whether bulk ingest is supported - virtual bool supports_bulk_ingest() const { return true; } + /// \brief Whether we can get ADBC_CONNECTION_OPTION_CURRENT_CATALOG + virtual bool supports_metadata_current_catalog() const { return false; } + + /// \brief Whether we can get ADBC_CONNECTION_OPTION_CURRENT_DB_SCHEMA + virtual bool supports_metadata_current_db_schema() const { return false; } /// \brief Whether dynamic parameter bindings are supported for prepare virtual bool supports_dynamic_parameter_binding() const { return true; } @@ -165,6 +180,9 @@ class ConnectionTest { void TestAutocommitToggle(); + void TestMetadataCurrentCatalog(); + void TestMetadataCurrentDbSchema(); + void TestMetadataGetInfo(); void TestMetadataGetTableSchema(); void TestMetadataGetTableTypes(); @@ -176,6 +194,7 @@ class ConnectionTest { void TestMetadataGetObjectsColumns(); void TestMetadataGetObjectsConstraints(); void TestMetadataGetObjectsPrimaryKey(); + void TestMetadataGetObjectsCancel(); protected: struct AdbcError error; @@ -183,28 +202,31 @@ class ConnectionTest { struct AdbcConnection connection; }; -#define ADBCV_TEST_CONNECTION(FIXTURE) \ - static_assert(std::is_base_of::value, \ - ADBCV_STRINGIFY(FIXTURE) " must inherit from ConnectionTest"); \ - TEST_F(FIXTURE, NewInit) { TestNewInit(); } \ - TEST_F(FIXTURE, Release) { TestRelease(); } \ - TEST_F(FIXTURE, Concurrent) { TestConcurrent(); } \ - TEST_F(FIXTURE, AutocommitDefault) { TestAutocommitDefault(); } \ - TEST_F(FIXTURE, AutocommitToggle) { TestAutocommitToggle(); } \ - TEST_F(FIXTURE, MetadataGetInfo) { TestMetadataGetInfo(); } \ - TEST_F(FIXTURE, MetadataGetTableSchema) { TestMetadataGetTableSchema(); } \ - TEST_F(FIXTURE, MetadataGetTableTypes) { TestMetadataGetTableTypes(); } \ - TEST_F(FIXTURE, MetadataGetObjectsCatalogs) { TestMetadataGetObjectsCatalogs(); } \ - TEST_F(FIXTURE, MetadataGetObjectsDbSchemas) { TestMetadataGetObjectsDbSchemas(); } \ - TEST_F(FIXTURE, MetadataGetObjectsTables) { TestMetadataGetObjectsTables(); } \ - TEST_F(FIXTURE, MetadataGetObjectsTablesTypes) { \ - TestMetadataGetObjectsTablesTypes(); \ - } \ - TEST_F(FIXTURE, MetadataGetObjectsColumns) { TestMetadataGetObjectsColumns(); } \ - TEST_F(FIXTURE, MetadataGetObjectsConstraints) { \ - TestMetadataGetObjectsConstraints(); \ - } \ - TEST_F(FIXTURE, MetadataGetObjectsPrimaryKey) { TestMetadataGetObjectsPrimaryKey(); } +#define ADBCV_TEST_CONNECTION(FIXTURE) \ + static_assert(std::is_base_of::value, \ + ADBCV_STRINGIFY(FIXTURE) " must inherit from ConnectionTest"); \ + TEST_F(FIXTURE, NewInit) { TestNewInit(); } \ + TEST_F(FIXTURE, Release) { TestRelease(); } \ + TEST_F(FIXTURE, Concurrent) { TestConcurrent(); } \ + TEST_F(FIXTURE, AutocommitDefault) { TestAutocommitDefault(); } \ + TEST_F(FIXTURE, AutocommitToggle) { TestAutocommitToggle(); } \ + TEST_F(FIXTURE, MetadataCurrentCatalog) { TestMetadataCurrentCatalog(); } \ + TEST_F(FIXTURE, MetadataCurrentDbSchema) { TestMetadataCurrentDbSchema(); } \ + TEST_F(FIXTURE, MetadataGetInfo) { TestMetadataGetInfo(); } \ + TEST_F(FIXTURE, MetadataGetTableSchema) { TestMetadataGetTableSchema(); } \ + TEST_F(FIXTURE, MetadataGetTableTypes) { TestMetadataGetTableTypes(); } \ + TEST_F(FIXTURE, MetadataGetObjectsCatalogs) { TestMetadataGetObjectsCatalogs(); } \ + TEST_F(FIXTURE, MetadataGetObjectsDbSchemas) { TestMetadataGetObjectsDbSchemas(); } \ + TEST_F(FIXTURE, MetadataGetObjectsTables) { TestMetadataGetObjectsTables(); } \ + TEST_F(FIXTURE, MetadataGetObjectsTablesTypes) { \ + TestMetadataGetObjectsTablesTypes(); \ + } \ + TEST_F(FIXTURE, MetadataGetObjectsColumns) { TestMetadataGetObjectsColumns(); } \ + TEST_F(FIXTURE, MetadataGetObjectsConstraints) { \ + TestMetadataGetObjectsConstraints(); \ + } \ + TEST_F(FIXTURE, MetadataGetObjectsPrimaryKey) { TestMetadataGetObjectsPrimaryKey(); } \ + TEST_F(FIXTURE, MetadataGetObjectsCancel) { TestMetadataGetObjectsCancel(); } class StatementTest { public: @@ -245,6 +267,8 @@ class StatementTest { void TestSqlIngestTableEscaping(); void TestSqlIngestAppend(); + void TestSqlIngestReplace(); + void TestSqlIngestCreateAppend(); void TestSqlIngestErrors(); void TestSqlIngestMultipleConnections(); void TestSqlIngestSample(); @@ -264,8 +288,15 @@ class StatementTest { void TestSqlQueryFloats(); void TestSqlQueryStrings(); + void TestSqlQueryCancel(); void TestSqlQueryErrors(); + void TestSqlSchemaInts(); + void TestSqlSchemaFloats(); + void TestSqlSchemaStrings(); + + void TestSqlSchemaErrors(); + void TestTransactions(); void TestConcurrentStatements(); @@ -312,6 +343,8 @@ class StatementTest { TEST_F(FIXTURE, SqlIngestTimestampTz) { TestSqlIngestTimestampTz(); } \ TEST_F(FIXTURE, SqlIngestTableEscaping) { TestSqlIngestTableEscaping(); } \ TEST_F(FIXTURE, SqlIngestAppend) { TestSqlIngestAppend(); } \ + TEST_F(FIXTURE, SqlIngestReplace) { TestSqlIngestReplace(); } \ + TEST_F(FIXTURE, SqlIngestCreateAppend) { TestSqlIngestCreateAppend(); } \ TEST_F(FIXTURE, SqlIngestErrors) { TestSqlIngestErrors(); } \ TEST_F(FIXTURE, SqlIngestMultipleConnections) { TestSqlIngestMultipleConnections(); } \ TEST_F(FIXTURE, SqlIngestSample) { TestSqlIngestSample(); } \ @@ -329,7 +362,12 @@ class StatementTest { TEST_F(FIXTURE, SqlQueryInts) { TestSqlQueryInts(); } \ TEST_F(FIXTURE, SqlQueryFloats) { TestSqlQueryFloats(); } \ TEST_F(FIXTURE, SqlQueryStrings) { TestSqlQueryStrings(); } \ + TEST_F(FIXTURE, SqlQueryCancel) { TestSqlQueryCancel(); } \ TEST_F(FIXTURE, SqlQueryErrors) { TestSqlQueryErrors(); } \ + TEST_F(FIXTURE, SqlSchemaInts) { TestSqlSchemaInts(); } \ + TEST_F(FIXTURE, SqlSchemaFloats) { TestSqlSchemaFloats(); } \ + TEST_F(FIXTURE, SqlSchemaStrings) { TestSqlSchemaStrings(); } \ + TEST_F(FIXTURE, SqlSchemaErrors) { TestSqlSchemaErrors(); } \ TEST_F(FIXTURE, Transactions) { TestTransactions(); } \ TEST_F(FIXTURE, ConcurrentStatements) { TestConcurrentStatements(); } \ TEST_F(FIXTURE, ResultInvalidation) { TestResultInvalidation(); } diff --git a/go/adbc/drivermgr/adbc.h b/go/adbc/drivermgr/adbc.h index badfc7d65a..4c13585c08 100644 --- a/go/adbc/drivermgr/adbc.h +++ b/go/adbc/drivermgr/adbc.h @@ -1292,7 +1292,8 @@ AdbcStatusCode AdbcConnectionRelease(struct AdbcConnection* connection, /// or while consuming an ArrowArrayStream returned from such. /// Calling this function should make the other functions return /// ADBC_STATUS_CANCELLED (from ADBC functions) or ECANCELED (from -/// methods of ArrowArrayStream). +/// methods of ArrowArrayStream). (It is not guaranteed to, for +/// instance, the result set may be buffered in memory already.) /// /// This must always be thread-safe (other operations are not). /// @@ -1947,7 +1948,8 @@ AdbcStatusCode AdbcStatementBindStream(struct AdbcStatement* statement, /// or while consuming an ArrowArrayStream returned from such. /// Calling this function should make the other functions return /// ADBC_STATUS_CANCELLED (from ADBC functions) or ECANCELED (from -/// methods of ArrowArrayStream). +/// methods of ArrowArrayStream). (It is not guaranteed to, for +/// instance, the result set may be buffered in memory already.) /// /// This must always be thread-safe (other operations are not). ///