Skip to content

Commit

Permalink
feat(c/driver/postgresql): implement ADBC 1.1.0 features
Browse files Browse the repository at this point in the history
- ADBC_INFO_DRIVER_ADBC_VERSION
- StatementExecuteSchema (#318)
- ADBC_CONNECTION_OPTION_CURRENT_{CATALOG, DB_SCHEMA) (#319)
- ConnectionCancel/StatementCancel
- GetOption/SetOption
- Ingest modes
  • Loading branch information
lidavidm committed Jul 26, 2023
1 parent a06e53a commit d1f8890
Show file tree
Hide file tree
Showing 26 changed files with 1,735 additions and 212 deletions.
6 changes: 4 additions & 2 deletions adbc.h
Original file line number Diff line number Diff line change
Expand Up @@ -1292,7 +1292,8 @@ AdbcStatusCode AdbcConnectionRelease(struct AdbcConnection* connection,
/// or while consuming an ArrowArrayStream returned from such.
/// Calling this function should make the other functions return
/// ADBC_STATUS_CANCELLED (from ADBC functions) or ECANCELED (from
/// methods of ArrowArrayStream).
/// methods of ArrowArrayStream). (It is not guaranteed to, for
/// instance, the result set may be buffered in memory already.)
///
/// This must always be thread-safe (other operations are not).
///
Expand Down Expand Up @@ -1947,7 +1948,8 @@ AdbcStatusCode AdbcStatementBindStream(struct AdbcStatement* statement,
/// or while consuming an ArrowArrayStream returned from such.
/// Calling this function should make the other functions return
/// ADBC_STATUS_CANCELLED (from ADBC functions) or ECANCELED (from
/// methods of ArrowArrayStream).
/// methods of ArrowArrayStream). (It is not guaranteed to, for
/// instance, the result set may be buffered in memory already.)
///
/// This must always be thread-safe (other operations are not).
///
Expand Down
13 changes: 13 additions & 0 deletions c/driver/common/utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,19 @@ AdbcStatusCode AdbcConnectionGetInfoAppendString(struct ArrowArray* array,
return ADBC_STATUS_OK;
}

AdbcStatusCode AdbcConnectionGetInfoAppendInt(struct ArrowArray* array,
uint32_t info_code, int64_t info_value,
struct AdbcError* error) {
CHECK_NA(INTERNAL, ArrowArrayAppendUInt(array->children[0], info_code), error);
// Append to type variant
CHECK_NA(INTERNAL, ArrowArrayAppendInt(array->children[1]->children[2], info_value),
error);
// Append type code/offset
CHECK_NA(INTERNAL, ArrowArrayFinishUnionElement(array->children[1], /*type_id=*/2),
error);
return ADBC_STATUS_OK;
}

AdbcStatusCode AdbcInitConnectionObjectsSchema(struct ArrowSchema* schema,
struct AdbcError* error) {
ArrowSchemaInit(schema);
Expand Down
3 changes: 3 additions & 0 deletions c/driver/common/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ AdbcStatusCode AdbcConnectionGetInfoAppendString(struct ArrowArray* array,
uint32_t info_code,
const char* info_value,
struct AdbcError* error);
AdbcStatusCode AdbcConnectionGetInfoAppendInt(struct ArrowArray* array,
uint32_t info_code, int64_t info_value,
struct AdbcError* error);

AdbcStatusCode AdbcInitConnectionObjectsSchema(struct ArrowSchema* schema,
struct AdbcError* error);
Expand Down
3 changes: 2 additions & 1 deletion c/driver/flightsql/dremio_flightsql_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@ class DremioFlightSqlQuirks : public adbc_validation::DriverQuirks {
}

std::string BindParameter(int index) const override { return "?"; }
bool supports_bulk_ingest(const char* /*mode*/) const override { return false; }
bool supports_concurrent_statements() const override { return true; }
bool supports_transactions() const override { return false; }
bool supports_get_sql_info() const override { return false; }
bool supports_get_objects() const override { return true; }
bool supports_bulk_ingest() const override { return false; }
bool supports_partitioned_data() const override { return true; }
bool supports_dynamic_parameter_binding() const override { return false; }
};
Expand Down Expand Up @@ -87,6 +87,7 @@ class DremioFlightSqlStatementTest : public ::testing::Test,
void SetUp() override { ASSERT_NO_FATAL_FAILURE(SetUpTest()); }
void TearDown() override { ASSERT_NO_FATAL_FAILURE(TearDownTest()); }

void TestResultInvalidation() { GTEST_SKIP() << "Dremio generates a CANCELLED"; }
void TestSqlIngestTableEscaping() { GTEST_SKIP() << "Table escaping not implemented"; }

protected:
Expand Down
3 changes: 2 additions & 1 deletion c/driver/flightsql/sqlite_flightsql_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ class SqliteFlightSqlQuirks : public adbc_validation::DriverQuirks {
}

std::string BindParameter(int index) const override { return "?"; }

bool supports_bulk_ingest(const char* /*mode*/) const override { return false; }
bool supports_concurrent_statements() const override { return true; }
bool supports_transactions() const override { return false; }
bool supports_get_sql_info() const override { return true; }
Expand All @@ -113,7 +115,6 @@ class SqliteFlightSqlQuirks : public adbc_validation::DriverQuirks {
}
}
bool supports_get_objects() const override { return true; }
bool supports_bulk_ingest() const override { return false; }
bool supports_partitioned_data() const override { return true; }
bool supports_dynamic_parameter_binding() const override { return true; }
};
Expand Down
111 changes: 105 additions & 6 deletions c/driver/postgresql/connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@
namespace {

static const uint32_t kSupportedInfoCodes[] = {
ADBC_INFO_VENDOR_NAME, ADBC_INFO_VENDOR_VERSION, ADBC_INFO_DRIVER_NAME,
ADBC_INFO_DRIVER_VERSION, ADBC_INFO_DRIVER_ARROW_VERSION,
ADBC_INFO_VENDOR_NAME, ADBC_INFO_VENDOR_VERSION,
ADBC_INFO_DRIVER_NAME, ADBC_INFO_DRIVER_VERSION,
ADBC_INFO_DRIVER_ARROW_VERSION, ADBC_INFO_DRIVER_ADBC_VERSION,
};

static const std::unordered_map<std::string, std::string> kPgTableTypes = {
Expand Down Expand Up @@ -114,8 +115,10 @@ class PqResultHelper {
result_ = PQexecPrepared(conn_, "", param_values_.size(), param_c_strs.data(), NULL,
NULL, 0);

if (PQresultStatus(result_) != PGRES_TUPLES_OK) {
SetError(error_, "[libpq] Failed to execute query: %s", PQerrorMessage(conn_));
ExecStatusType status = PQresultStatus(result_);
if (status != PGRES_TUPLES_OK && status != PGRES_COMMAND_OK) {
SetError(error_, "[libpq] Failed to execute query '%s': %s", query_.c_str(),
PQerrorMessage(conn_));
return ADBC_STATUS_IO;
}

Expand Down Expand Up @@ -729,6 +732,20 @@ class PqGetObjectsHelper {

namespace adbcpq {

AdbcStatusCode PostgresConnection::Cancel(struct AdbcError* error) {
// > errbuf must be a char array of size errbufsize (the recommended size is
// > 256 bytes).
// https://www.postgresql.org/docs/current/libpq-cancel.html
char errbuf[256];
// > The return value is 1 if the cancel request was successfully dispatched
// > and 0 if not.
if (PQcancel(cancel_, errbuf, sizeof(errbuf)) != 1) {
SetError(error, "[libpq] Failed to cancel operation: %s", errbuf);
return ADBC_STATUS_UNKNOWN;
}
return ADBC_STATUS_OK;
}

AdbcStatusCode PostgresConnection::Commit(struct AdbcError* error) {
if (autocommit_) {
SetError(error, "%s", "[libpq] Cannot commit when autocommit is enabled");
Expand Down Expand Up @@ -776,6 +793,10 @@ AdbcStatusCode PostgresConnectionGetInfoImpl(const uint32_t* info_codes,
RAISE_ADBC(AdbcConnectionGetInfoAppendString(array, info_codes[i],
NANOARROW_VERSION, error));
break;
case ADBC_INFO_DRIVER_ADBC_VERSION:
RAISE_ADBC(AdbcConnectionGetInfoAppendInt(array, info_codes[i],
ADBC_VERSION_1_1_0, error));
break;
default:
// Ignore
continue;
Expand Down Expand Up @@ -840,6 +861,47 @@ AdbcStatusCode PostgresConnection::GetObjects(
return BatchToArrayStream(&array, &schema, out, error);
}

AdbcStatusCode PostgresConnection::GetOption(const char* option, char* value,
size_t* length, struct AdbcError* error) {
std::string output;
if (std::strcmp(option, ADBC_CONNECTION_OPTION_CURRENT_CATALOG) == 0) {
output = PQdb(conn_);
} else if (std::strcmp(option, ADBC_CONNECTION_OPTION_CURRENT_DB_SCHEMA) == 0) {
PqResultHelper result_helper{conn_, "SELECT CURRENT_SCHEMA", {}, error};
RAISE_ADBC(result_helper.Prepare());
RAISE_ADBC(result_helper.Execute());
auto it = result_helper.begin();
if (it == result_helper.end()) {
SetError(error, "[libpq] PostgreSQL returned no rows for 'SELECT CURRENT_SCHEMA'");
return ADBC_STATUS_INTERNAL;
}
output = (*it)[0].data;
} else if (std::strcmp(option, ADBC_CONNECTION_OPTION_AUTOCOMMIT) == 0) {
output = autocommit_ ? ADBC_OPTION_VALUE_ENABLED : ADBC_OPTION_VALUE_DISABLED;
} else {
return ADBC_STATUS_NOT_FOUND;
}

if (output.size() + 1 <= *length) {
std::memcpy(value, output.c_str(), output.size() + 1);
}
*length = output.size() + 1;
return ADBC_STATUS_OK;
}
AdbcStatusCode PostgresConnection::GetOptionBytes(const char* option, uint8_t* value,
size_t* length,
struct AdbcError* error) {
return ADBC_STATUS_NOT_FOUND;
}
AdbcStatusCode PostgresConnection::GetOptionInt(const char* option, int64_t* value,
struct AdbcError* error) {
return ADBC_STATUS_NOT_FOUND;
}
AdbcStatusCode PostgresConnection::GetOptionDouble(const char* option, double* value,
struct AdbcError* error) {
return ADBC_STATUS_NOT_FOUND;
}

AdbcStatusCode PostgresConnection::GetTableSchema(const char* catalog,
const char* db_schema,
const char* table_name,
Expand Down Expand Up @@ -964,16 +1026,26 @@ AdbcStatusCode PostgresConnection::GetTableTypes(struct AdbcConnection* connecti
AdbcStatusCode PostgresConnection::Init(struct AdbcDatabase* database,
struct AdbcError* error) {
if (!database || !database->private_data) {
SetError(error, "%s", "[libpq] Must provide an initialized AdbcDatabase");
SetError(error, "[libpq] Must provide an initialized AdbcDatabase");
return ADBC_STATUS_INVALID_ARGUMENT;
}
database_ =
*reinterpret_cast<std::shared_ptr<PostgresDatabase>*>(database->private_data);
type_resolver_ = database_->type_resolver();
return database_->Connect(&conn_, error);
RAISE_ADBC(database_->Connect(&conn_, error));
cancel_ = PQgetCancel(conn_);
if (!cancel_) {
SetError(error, "[libpq] Could not initialize PGcancel");
return ADBC_STATUS_UNKNOWN;
}
return ADBC_STATUS_OK;
}

AdbcStatusCode PostgresConnection::Release(struct AdbcError* error) {
if (cancel_) {
PQfreeCancel(cancel_);
cancel_ = nullptr;
}
if (conn_) {
return database_->Disconnect(&conn_, error);
}
Expand Down Expand Up @@ -1023,8 +1095,35 @@ AdbcStatusCode PostgresConnection::SetOption(const char* key, const char* value,
autocommit_ = autocommit;
}
return ADBC_STATUS_OK;
} else if (std::strcmp(key, ADBC_CONNECTION_OPTION_CURRENT_DB_SCHEMA) == 0) {
// PostgreSQL doesn't accept a parameter here
PqResultHelper result_helper{
conn_, std::string("SET search_path TO ") + value, {}, error};
RAISE_ADBC(result_helper.Prepare());
RAISE_ADBC(result_helper.Execute());
return ADBC_STATUS_OK;
}
SetError(error, "%s%s", "[libpq] Unknown option ", key);
return ADBC_STATUS_NOT_IMPLEMENTED;
}

AdbcStatusCode PostgresConnection::SetOptionBytes(const char* key, const uint8_t* value,
size_t length,
struct AdbcError* error) {
SetError(error, "%s%s", "[libpq] Unknown option ", key);
return ADBC_STATUS_NOT_IMPLEMENTED;
}

AdbcStatusCode PostgresConnection::SetOptionDouble(const char* key, double value,
struct AdbcError* error) {
SetError(error, "%s%s", "[libpq] Unknown option ", key);
return ADBC_STATUS_NOT_IMPLEMENTED;
}

AdbcStatusCode PostgresConnection::SetOptionInt(const char* key, int64_t value,
struct AdbcError* error) {
SetError(error, "%s%s", "[libpq] Unknown option ", key);
return ADBC_STATUS_NOT_IMPLEMENTED;
}

} // namespace adbcpq
17 changes: 16 additions & 1 deletion c/driver/postgresql/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@ namespace adbcpq {
class PostgresDatabase;
class PostgresConnection {
public:
PostgresConnection() : database_(nullptr), conn_(nullptr), autocommit_(true) {}
PostgresConnection()
: database_(nullptr), conn_(nullptr), cancel_(nullptr), autocommit_(true) {}

AdbcStatusCode Cancel(struct AdbcError* error);
AdbcStatusCode Commit(struct AdbcError* error);
AdbcStatusCode GetInfo(struct AdbcConnection* connection, uint32_t* info_codes,
size_t info_codes_length, struct ArrowArrayStream* out,
Expand All @@ -40,6 +42,14 @@ class PostgresConnection {
const char* table_name, const char** table_types,
const char* column_name, struct ArrowArrayStream* out,
struct AdbcError* error);
AdbcStatusCode GetOption(const char* option, char* value, size_t* length,
struct AdbcError* error);
AdbcStatusCode GetOptionBytes(const char* option, uint8_t* value, size_t* length,
struct AdbcError* error);
AdbcStatusCode GetOptionDouble(const char* option, double* value,
struct AdbcError* error);
AdbcStatusCode GetOptionInt(const char* option, int64_t* value,
struct AdbcError* error);
AdbcStatusCode GetTableSchema(const char* catalog, const char* db_schema,
const char* table_name, struct ArrowSchema* schema,
struct AdbcError* error);
Expand All @@ -49,6 +59,10 @@ class PostgresConnection {
AdbcStatusCode Release(struct AdbcError* error);
AdbcStatusCode Rollback(struct AdbcError* error);
AdbcStatusCode SetOption(const char* key, const char* value, struct AdbcError* error);
AdbcStatusCode SetOptionBytes(const char* key, const uint8_t* value, size_t length,
struct AdbcError* error);
AdbcStatusCode SetOptionDouble(const char* key, double value, struct AdbcError* error);
AdbcStatusCode SetOptionInt(const char* key, int64_t value, struct AdbcError* error);

PGconn* conn() const { return conn_; }
const std::shared_ptr<PostgresTypeResolver>& type_resolver() const {
Expand All @@ -60,6 +74,7 @@ class PostgresConnection {
std::shared_ptr<PostgresDatabase> database_;
std::shared_ptr<PostgresTypeResolver> type_resolver_;
PGconn* conn_;
PGcancel* cancel_;
bool autocommit_;
};
} // namespace adbcpq
35 changes: 35 additions & 0 deletions c/driver/postgresql/database.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,23 @@ PostgresDatabase::PostgresDatabase() : open_connections_(0) {
}
PostgresDatabase::~PostgresDatabase() = default;

AdbcStatusCode PostgresDatabase::GetOption(const char* option, char* value,
size_t* length, struct AdbcError* error) {
return ADBC_STATUS_NOT_FOUND;
}
AdbcStatusCode PostgresDatabase::GetOptionBytes(const char* option, uint8_t* value,
size_t* length, struct AdbcError* error) {
return ADBC_STATUS_NOT_FOUND;
}
AdbcStatusCode PostgresDatabase::GetOptionInt(const char* option, int64_t* value,
struct AdbcError* error) {
return ADBC_STATUS_NOT_FOUND;
}
AdbcStatusCode PostgresDatabase::GetOptionDouble(const char* option, double* value,
struct AdbcError* error) {
return ADBC_STATUS_NOT_FOUND;
}

AdbcStatusCode PostgresDatabase::Init(struct AdbcError* error) {
// Connect to validate the parameters.
return RebuildTypeResolver(error);
Expand All @@ -61,6 +78,24 @@ AdbcStatusCode PostgresDatabase::SetOption(const char* key, const char* value,
return ADBC_STATUS_OK;
}

AdbcStatusCode PostgresDatabase::SetOptionBytes(const char* key, const uint8_t* value,
size_t length, struct AdbcError* error) {
SetError(error, "%s%s", "[libpq] Unknown option ", key);
return ADBC_STATUS_NOT_IMPLEMENTED;
}

AdbcStatusCode PostgresDatabase::SetOptionDouble(const char* key, double value,
struct AdbcError* error) {
SetError(error, "%s%s", "[libpq] Unknown option ", key);
return ADBC_STATUS_NOT_IMPLEMENTED;
}

AdbcStatusCode PostgresDatabase::SetOptionInt(const char* key, int64_t value,
struct AdbcError* error) {
SetError(error, "%s%s", "[libpq] Unknown option ", key);
return ADBC_STATUS_NOT_IMPLEMENTED;
}

AdbcStatusCode PostgresDatabase::Connect(PGconn** conn, struct AdbcError* error) {
if (uri_.empty()) {
SetError(error, "%s",
Expand Down
12 changes: 12 additions & 0 deletions c/driver/postgresql/database.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,19 @@ class PostgresDatabase {

AdbcStatusCode Init(struct AdbcError* error);
AdbcStatusCode Release(struct AdbcError* error);
AdbcStatusCode GetOption(const char* option, char* value, size_t* length,
struct AdbcError* error);
AdbcStatusCode GetOptionBytes(const char* option, uint8_t* value, size_t* length,
struct AdbcError* error);
AdbcStatusCode GetOptionDouble(const char* option, double* value,
struct AdbcError* error);
AdbcStatusCode GetOptionInt(const char* option, int64_t* value,
struct AdbcError* error);
AdbcStatusCode SetOption(const char* key, const char* value, struct AdbcError* error);
AdbcStatusCode SetOptionBytes(const char* key, const uint8_t* value, size_t length,
struct AdbcError* error);
AdbcStatusCode SetOptionDouble(const char* key, double value, struct AdbcError* error);
AdbcStatusCode SetOptionInt(const char* key, int64_t value, struct AdbcError* error);

// Internal implementation

Expand Down
8 changes: 6 additions & 2 deletions c/driver/postgresql/postgres_copy_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -843,12 +843,13 @@ static inline ArrowErrorCode MakeCopyFieldReader(const PostgresType& pg_type,

class PostgresCopyStreamReader {
public:
ArrowErrorCode Init(const PostgresType& pg_type) {
ArrowErrorCode Init(PostgresType pg_type) {
if (pg_type.type_id() != PostgresTypeId::kRecord) {
return EINVAL;
}

root_reader_.Init(pg_type);
pg_type_ = std::move(pg_type);
root_reader_.Init(pg_type_);
array_size_approx_bytes_ = 0;
return NANOARROW_OK;
}
Expand Down Expand Up @@ -972,7 +973,10 @@ class PostgresCopyStreamReader {
return NANOARROW_OK;
}

const PostgresType& pg_type() const { return pg_type_; }

private:
PostgresType pg_type_;
PostgresCopyFieldTupleReader root_reader_;
nanoarrow::UniqueSchema schema_;
nanoarrow::UniqueArray array_;
Expand Down
Loading

0 comments on commit d1f8890

Please sign in to comment.