Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(c/driver/postgresql,python): implement error_details spec #946

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions c/driver/common/utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ static void ReleaseError(struct AdbcError* error) {
}

void SetError(struct AdbcError* error, const char* format, ...) {
va_list args;
va_start(args, format);
SetErrorVariadic(error, format, args);
va_end(args);
}

void SetErrorVariadic(struct AdbcError* error, const char* format, va_list args) {
if (!error) return;
if (error->release) {
// TODO: combine the errors if possible
Expand All @@ -44,10 +51,7 @@ void SetError(struct AdbcError* error, const char* format, ...) {

error->release = &ReleaseError;

va_list args;
va_start(args, format);
vsnprintf(error->message, kErrorBufferSize, format, args);
va_end(args);
}

struct SingleBatchArrayStream {
Expand Down
3 changes: 3 additions & 0 deletions c/driver/common/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#pragma once

#include <stdarg.h>
#include <stdbool.h>
#include <stdint.h>

Expand All @@ -39,6 +40,8 @@ extern "C" {
void SetError(struct AdbcError* error, const char* format,
...) ADBC_CHECK_PRINTF_ATTRIBUTE;

void SetErrorVariadic(struct AdbcError* error, const char* format, va_list args);

struct StringBuilder {
char* buffer;
// Not including null terminator
Expand Down
1 change: 1 addition & 0 deletions c/driver/postgresql/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ add_arrow_lib(adbc_driver_postgresql
SOURCES
connection.cc
database.cc
error.cc
postgresql.cc
statement.cc
OUTPUTS
Expand Down
8 changes: 4 additions & 4 deletions c/driver/postgresql/connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@ struct PqRecord {
};

// Used by PqResultHelper to provide index-based access to the records within each
// row of a pg_result
// row of a PGresult
class PqResultRow {
public:
PqResultRow(pg_result* result, int row_num) : result_(result), row_num_(row_num) {
PqResultRow(PGresult* result, int row_num) : result_(result), row_num_(row_num) {
ncols_ = PQnfields(result);
}

Expand All @@ -69,7 +69,7 @@ class PqResultRow {
}

private:
pg_result* result_ = nullptr;
PGresult* result_ = nullptr;
int row_num_;
int ncols_;
};
Expand Down Expand Up @@ -167,7 +167,7 @@ class PqResultHelper {
iterator end() { return iterator(*this, NumRows()); }

private:
pg_result* result_ = nullptr;
PGresult* result_ = nullptr;
PGconn* conn_;
std::string query_;
std::vector<std::string> param_values_;
Expand Down
3 changes: 3 additions & 0 deletions c/driver/postgresql/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <adbc.h>
#include <libpq-fe.h>

#include "error.h"
#include "postgres_type.h"

namespace adbcpq {
Expand Down Expand Up @@ -69,10 +70,12 @@ class PostgresConnection {
return type_resolver_;
}
bool autocommit() const { return autocommit_; }
ErrorDetailsState* error_details() { return &error_details_; }

private:
std::shared_ptr<PostgresDatabase> database_;
std::shared_ptr<PostgresTypeResolver> type_resolver_;
ErrorDetailsState error_details_;
PGconn* conn_;
PGcancel* cancel_;
bool autocommit_;
Expand Down
11 changes: 6 additions & 5 deletions c/driver/postgresql/database.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <adbc.h>
#include <libpq-fe.h>
#include <nanoarrow/nanoarrow.h>
#include <postgresql/error.h>

#include "common/utils.h"

Expand Down Expand Up @@ -125,10 +126,10 @@ AdbcStatusCode PostgresDatabase::Disconnect(PGconn** conn, struct AdbcError* err

// Helpers for building the type resolver from queries
static inline int32_t InsertPgAttributeResult(
pg_result* result, const std::shared_ptr<PostgresTypeResolver>& resolver);
PGresult* result, const std::shared_ptr<PostgresTypeResolver>& resolver);

static inline int32_t InsertPgTypeResult(
pg_result* result, const std::shared_ptr<PostgresTypeResolver>& resolver);
PGresult* result, const std::shared_ptr<PostgresTypeResolver>& resolver);

AdbcStatusCode PostgresDatabase::RebuildTypeResolver(struct AdbcError* error) {
PGconn* conn = nullptr;
Expand Down Expand Up @@ -177,7 +178,7 @@ ORDER BY
auto resolver = std::make_shared<PostgresTypeResolver>();

// Insert record type definitions (this includes table schemas)
pg_result* result = PQexec(conn, kColumnsQuery.c_str());
PGresult* result = PQexec(conn, kColumnsQuery.c_str());
ExecStatusType pq_status = PQresultStatus(result);
if (pq_status == PGRES_TUPLES_OK) {
InsertPgAttributeResult(result, resolver);
Expand Down Expand Up @@ -222,7 +223,7 @@ ORDER BY
}

static inline int32_t InsertPgAttributeResult(
pg_result* result, const std::shared_ptr<PostgresTypeResolver>& resolver) {
PGresult* result, const std::shared_ptr<PostgresTypeResolver>& resolver) {
int num_rows = PQntuples(result);
std::vector<std::pair<std::string, uint32_t>> columns;
uint32_t current_type_oid = 0;
Expand Down Expand Up @@ -254,7 +255,7 @@ static inline int32_t InsertPgAttributeResult(
}

static inline int32_t InsertPgTypeResult(
pg_result* result, const std::shared_ptr<PostgresTypeResolver>& resolver) {
PGresult* result, const std::shared_ptr<PostgresTypeResolver>& resolver) {
int num_rows = PQntuples(result);
PostgresTypeResolver::Item item;
int32_t n_added = 0;
Expand Down
189 changes: 189 additions & 0 deletions c/driver/postgresql/error.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "error.h"

#include <iostream>

#include <adbc.h>
#include <sys/errno.h>
#include <cstring>

#include <common/utils.h>

namespace adbcpq {

void SetErrorPgResult(struct AdbcError* error, PGresult* result) {
if (!error) return;
const char* sqlstate = PQresultErrorField(result, PG_DIAG_SQLSTATE);
if (sqlstate) {
std::strncpy(error->sqlstate, sqlstate, 5);
}
}

int AdbcStatusCodeToErrno(AdbcStatusCode code) {
switch (code) {
case ADBC_STATUS_OK:
return 0;
case ADBC_STATUS_UNKNOWN:
return EIO;
case ADBC_STATUS_NOT_IMPLEMENTED:
return ENOTSUP;
case ADBC_STATUS_NOT_FOUND:
return ENOENT;
case ADBC_STATUS_ALREADY_EXISTS:
return EEXIST;
case ADBC_STATUS_INVALID_ARGUMENT:
case ADBC_STATUS_INVALID_STATE:
return EINVAL;
case ADBC_STATUS_INVALID_DATA:
case ADBC_STATUS_INTEGRITY:
case ADBC_STATUS_INTERNAL:
case ADBC_STATUS_IO:
return EIO;
case ADBC_STATUS_CANCELLED:
return ECANCELED;
case ADBC_STATUS_TIMEOUT:
return ETIMEDOUT;
case ADBC_STATUS_UNAUTHENTICATED:
return EAUTH;
case ADBC_STATUS_UNAUTHORIZED:
return EACCES;
default:
return EIO;
}
}

void ErrorDetailsState::SetError(struct AdbcError* error, const char* format, ...) {
va_list args;
va_start(args, format);
::SetErrorVariadic(error, format, args);
va_end(args);

std::lock_guard<std::mutex> guard(mutex_);
details_.clear();
}

AdbcStatusCode ErrorDetailsState::SetError(struct AdbcError* error, PGresult* result, const char* format, ...) {
va_list args;
va_start(args, format);
::SetErrorVariadic(error, format, args);
va_end(args);

return SetDetail(error, result);
}

void ErrorDetailsState::SetDetail(std::string key, std::string value) {
std::lock_guard<std::mutex> guard(mutex_);
details_.emplace_back(std::move(key), std::move(value));
}

AdbcStatusCode ErrorDetailsState::SetDetail(struct AdbcError* error, PGresult* result) {
std::lock_guard<std::mutex> guard(mutex_);
const char* sqlstate = PQresultErrorField(result, PG_DIAG_SQLSTATE);
if (error) {
if (sqlstate) {
static_assert(sizeof(error->sqlstate) == 5, "");
std::strncpy(error->sqlstate, sqlstate, sizeof(error->sqlstate));
}
}

AdbcStatusCode code = ADBC_STATUS_IO;
if (sqlstate) {
// Duplicate SQLSTATE since in the context of an ArrowArrayStream,
// we have no way to return it through an AdbcError
details_.emplace_back("SQLSTATE", sqlstate);

// https://www.postgresql.org/docs/current/errcodes-appendix.html
// We can extend this in the future
if (std::strcmp(sqlstate, "57014") == 0) {
code = ADBC_STATUS_CANCELLED;
}
}
const char* primary = PQresultErrorField(result, PG_DIAG_MESSAGE_PRIMARY);
if (primary) {
details_.emplace_back("PG_DIAG_MESSAGE_PRIMARY", primary);
}
const char* detail = PQresultErrorField(result, PG_DIAG_MESSAGE_DETAIL);
if (detail) {
details_.emplace_back("PG_DIAG_MESSAGE_DETAIL", detail);
}
const char* hint = PQresultErrorField(result, PG_DIAG_MESSAGE_HINT);
if (hint) {
details_.emplace_back("PG_DIAG_MESSAGE_HINT", hint);
}
const char* context = PQresultErrorField(result, PG_DIAG_CONTEXT);
if (context) {
details_.emplace_back("PG_DIAG_CONTEXT", context);
}
const char* source_file = PQresultErrorField(result, PG_DIAG_SOURCE_FILE);
if (source_file) {
details_.emplace_back("PG_DIAG_SOURCE_FILE", source_file);
}
const char* source_line = PQresultErrorField(result, PG_DIAG_SOURCE_LINE);
if (source_line) {
details_.emplace_back("PG_DIAG_SOURCE_LINE", source_line);
}
// There are many other fields in PQresultErrorField that we could extract
return code;
}

void ErrorDetailsState::Clear() {
std::lock_guard<std::mutex> guard(mutex_);
details_.clear();
}

static const int kErrorDetailsPrefixLen = std::strlen(ADBC_OPTION_ERROR_DETAILS_PREFIX);

bool ErrorDetailsState::GetOption(const char* key, std::string* value) {
if (std::strncmp(key, ADBC_OPTION_ERROR_DETAILS_PREFIX,
kErrorDetailsPrefixLen) == 0) {
int64_t index = std::strtol(key + kErrorDetailsPrefixLen, nullptr, 10);
if (errno != 0 || index < 0) return false;
std::lock_guard<std::mutex> guard(mutex_);
if (static_cast<size_t>(index) >= details_.size()) return false;

*value = details_[index].first;
return true;
}
return false;
}

bool ErrorDetailsState::GetOptionBytes(const char* key, std::string* value) {
if (std::strncmp(key, ADBC_OPTION_ERROR_DETAILS_PREFIX,
kErrorDetailsPrefixLen) == 0) {
int64_t index = std::strtol(key + kErrorDetailsPrefixLen, nullptr, 10);
if (errno != 0 || index < 0) return false;
std::lock_guard<std::mutex> guard(mutex_);
if (static_cast<size_t>(index) >= details_.size()) return false;

*value = details_[index].second;
return true;
}
return false;
}

bool ErrorDetailsState::GetOptionInt(const char* key, int64_t* value) {
if (std::strcmp(key, ADBC_OPTION_ERROR_DETAILS) == 0) {
std::lock_guard<std::mutex> guard(mutex_);
*value = static_cast<int64_t>(details_.size());
return true;
}
return false;
}

}
Loading
Loading