Skip to content

Commit

Permalink
fix(c/driver/postgresql): only clear schema option if needed (#1174)
Browse files Browse the repository at this point in the history
Fixes #1109.
  • Loading branch information
lidavidm authored Oct 6, 2023
1 parent 13fc595 commit ad24938
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 1 deletion.
49 changes: 49 additions & 0 deletions c/driver/postgresql/postgresql_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -904,6 +904,55 @@ class PostgresStatementTest : public ::testing::Test,
};
ADBCV_TEST_STATEMENT(PostgresStatementTest)

TEST_F(PostgresStatementTest, SqlIngestSchema) {
const std::string schema_name = "testschema";

ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error));

ASSERT_THAT(AdbcStatementSetSqlQuery(&statement,
"CREATE SCHEMA IF NOT EXISTS testschema", &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementExecuteQuery(&statement, nullptr, nullptr, &error),
IsOkStatus(&error));

std::string drop = "DROP TABLE IF EXISTS testschema.schematable";
ASSERT_THAT(AdbcStatementSetSqlQuery(&statement, drop.c_str(), &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementExecuteQuery(&statement, nullptr, nullptr, &error),
IsOkStatus(&error));

{
adbc_validation::Handle<struct ArrowSchema> schema;
adbc_validation::Handle<struct ArrowArray> batch;

ArrowSchemaInit(&schema.value);
ASSERT_THAT(ArrowSchemaSetTypeStruct(&schema.value, 1), adbc_validation::IsOkErrno());
ASSERT_THAT(ArrowSchemaSetType(schema->children[0], NANOARROW_TYPE_INT64),
adbc_validation::IsOkErrno());
ASSERT_THAT(ArrowSchemaSetName(schema->children[0], "ints"),
adbc_validation::IsOkErrno());

ASSERT_THAT((adbc_validation::MakeBatch<int64_t>(
&schema.value, &batch.value, static_cast<struct ArrowError*>(nullptr),
{-1, 0, 1, std::nullopt})),
adbc_validation::IsOkErrno());

ASSERT_THAT(AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_TARGET_TABLE,
"schematable", &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_MODE,
ADBC_INGEST_OPTION_MODE_CREATE, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_TARGET_DB_SCHEMA,
schema_name.c_str(), &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementBind(&statement, &batch.value, &schema.value, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementExecuteQuery(&statement, nullptr, nullptr, &error),
IsOkStatus(&error));
}
}

TEST_F(PostgresStatementTest, SqlIngestTemporaryTable) {
ASSERT_THAT(quirks()->DropTempTable(&connection, "temptable", &error),
IsOkStatus(&error));
Expand Down
4 changes: 3 additions & 1 deletion c/driver/postgresql/statement.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1300,14 +1300,16 @@ AdbcStatusCode PostgresStatement::SetOption(const char* key, const char* value,
prepared_ = false;
} else if (std::strcmp(key, ADBC_INGEST_OPTION_TEMPORARY) == 0) {
if (std::strcmp(value, ADBC_OPTION_VALUE_ENABLED) == 0) {
// https://github.com/apache/arrow-adbc/issues/1109: only clear the
// schema if enabling since Python always sets the flag explicitly
ingest_.temporary = true;
ingest_.db_schema.clear();
} else if (std::strcmp(value, ADBC_OPTION_VALUE_DISABLED) == 0) {
ingest_.temporary = false;
} else {
SetError(error, "[libpq] Invalid value '%s' for option '%s'", value, key);
return ADBC_STATUS_INVALID_ARGUMENT;
}
ingest_.db_schema.clear();
prepared_ = false;
} else if (std::strcmp(key, ADBC_POSTGRESQL_OPTION_BATCH_SIZE_HINT_BYTES) == 0) {
int64_t int_value = std::atol(value);
Expand Down
15 changes: 15 additions & 0 deletions python/adbc_driver_postgresql/tests/test_dbapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,21 @@ def test_ingest(postgres: dbapi.Connection) -> None:
cur.adbc_ingest("foo", table, catalog_name="main")


def test_ingest_schema(postgres: dbapi.Connection) -> None:
table = pyarrow.Table.from_pydict({"numbers": [1, 2], "letters": ["a", "b"]})

with postgres.cursor() as cur:
cur.execute("CREATE SCHEMA IF NOT EXISTS testschema")
cur.execute("DROP TABLE IF EXISTS testschema.foo")

postgres.commit()

cur.adbc_ingest("foo", table, mode="create", db_schema_name="testschema")

cur.execute("SELECT * FROM testschema.foo ORDER BY numbers")
assert cur.fetch_arrow_table() == table


def test_ingest_temporary(postgres: dbapi.Connection) -> None:
table = pyarrow.Table.from_pydict(
{
Expand Down

0 comments on commit ad24938

Please sign in to comment.