-
Notifications
You must be signed in to change notification settings - Fork 14.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Error not detected in multi-statement vertica query #32993
Comments
@darkag Since you mentioned the dependent python client has issues, would you have a suggestion on how this could be tackled? |
More or less, as far as I understand the probleme, the only way to solve this problem is to modify the fetch_all_handler like this: def cust_fetch_all_handler(cursor) -> list[tuple] | None:
"""Handler for DbApiHook.run() to return results."""
if not hasattr(cursor, "description"):
raise RuntimeError(
"The database we interact with does not support DBAPI 2.0. Use operator and "
"handlers that are specifically designed for your database."
)
if cursor.description is not None:
to_return = cursor.fetchall()
while cursor.nextset(): #loop on all statement result
cursor.fetchall()
return to_return
else:
return None In this example I keep the first fetch_all result to return it (to not break the current behaviour). what I don't know is how to pass this handler only for vertica since I don't know what can be the consequences of this modification for other providers. |
Yes, it will affect other providers as the same base class/hook is used by others. Looks like the So you can override this I have not used Vertica. But if you feel that is what is needed, please open a PR with that change and someone will be able to review your change. |
It seems that operator run directly the hook's run function passing the fetch_all_handler as default, but it should work doing something like this for vertica hook: from airflow.providers.common.sql.hooks.sql import DbApiHook, fetch_all_handler
def vertica_fetch_all_handler(cursor) -> list[tuple] | None:
to_return = fetch_all_handler(cursor)
while cursor.nextset(): #loop on all statement result to get errors
cursor.fetchall()
return to_return
class VerticaHook(DbApiHook):
"""Interact with Vertica."""
def run(
self,
sql: str | Iterable[str],
autocommit: bool = False,
parameters: Iterable | Mapping | None = None,
handler: Callable | None = None,
split_statements: bool = False,
return_last: bool = True,
) -> Any | list[Any] | None:
if(handler == fetch_all_handler):
handler = vertica_fetch_all_handler
return DbApiHook.run(self, sql, autocommit, parameters, split_statements, return_last) I'll try to make test with this code tomorrow. |
While testing a possible fix I've found an other issue with the vertica client. Since the way I will fix the issue with operator may depend of a future fix for the above issue I'll wait before make a PR. (the code provided earlier will work if you add the missing handler parameter in the DbApiHook.run call but I may not have to call fetchall for all the remaining result set) |
Apache Airflow version
2.6.3
What happened
Hello,
There is a problem with multi-statement query and vertica, error will be detected only if it happens on the first statement of the sql.
for example if I run the following sql with default SQLExecuteQueryOperator options:
INSERT INTO MyTable (Key, Label) values (1, 'test 1');
INSERT INTO MyTable (Key, Label) values (1, 'test 2');
INSERT INTO MyTable (Key, Label) values (3, 'test 3');
the first insert will be commited, the nexts won't and no errors will be returned.
the same sql runed on mysql will return an error and no row will be inserted.
It seems to be linked to the way the vertica python client works (an issue has been opened on their git 4 years ago, Duplicate key values error is not thrown as exception and is getting ignored) but since a workaround was provided I don't think it will be fixed in a near future.
For the moment, to workaroud I use the split statement option with disabling auto-commit but I think it's dangerous to let this behaviour as is.
What you think should happen instead
No response
How to reproduce
create a table MyTable with two columns Key and Lbl, declare Key as primary key.
Run the following query with SQLExecuteQueryOperator
INSERT INTO MyTable (Key, Label) values (1, 'test 1');
INSERT INTO MyTable (Key, Label) values (1, 'test 2');
INSERT INTO MyTable (Key, Label) values (3, 'test 3');
Operating System
Debian GNU/Linux 11 (bullseye)
Versions of Apache Airflow Providers
No response
Deployment
Official Apache Airflow Helm Chart
Deployment details
No response
Anything else
No response
Are you willing to submit PR?
Code of Conduct
The text was updated successfully, but these errors were encountered: