Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ResultSet fetchmany_arrow/fetchall_arrow methods fail during concat_tables #418

Open
ksofeikov opened this issue Jul 29, 2024 · 10 comments
Open

Comments

@ksofeikov
Copy link

ksofeikov commented Jul 29, 2024

Hi there,

I'm using this client library to fetch much data from our DBX environment. The version I'm using is 3.3.0.

The library keeps crashing when attempts to concatenate two current and partial results. I can not attach to full trace, because it contains some of the internal schemas, but here is the gist of it:

creation_date
First Schema: creation_date: timestamp[us, tz=Etc/UTC]
Second Schema: creation_date: timestamp[us, tz=Etc/UTC] not null

status
First Schema: status: string
Second Schema: status: string not null

sender_id
First Schema: sender_id: string
Second Schema: sender_id: string not null

.
.
.
and a few other fields with the exact same discrepancy

The exact stack trace is

Traceback (most recent call last):
  File "/Users/X/work/scripts/raw_order.py", line 34, in <module>
    for r in tqdm(cursor, total=max_items):
  File "/Users/X/work/.venv/lib/python3.10/site-packages/tqdm/std.py", line 1181, in __iter__
    for obj in iterable:
  File "/Users/X/work//.venv/lib/python3.10/site-packages/databricks/sql/client.py", line 422, in __iter__
    for row in self.active_result_set:
  File "/Users/X/work//.venv/lib/python3.10/site-packages/databricks/sql/client.py", line 1112, in __iter__
    row = self.fetchone()
  File "/Users/X/work/.venv/lib/python3.10/site-packages/databricks/sql/client.py", line 1217, in fetchone
    res = self._convert_arrow_table(self.fetchmany_arrow(1))
  File "/Users/X/work/.venv/lib/python3.10/site-packages/databricks/sql/client.py", line 1193, in fetchmany_arrow
    results = pyarrow.concat_tables([results, partial_results])
  File "pyarrow/table.pxi", line 5962, in pyarrow.lib.concat_tables
  File "pyarrow/error.pxi", line 154, in pyarrow.lib.pyarrow_internal_check_status
  File "pyarrow/error.pxi", line 91, in pyarrow.lib.check_status
pyarrow.lib.ArrowInvalid: Schema at index 1 was different: 
GOES INTO DETAILS SPECIFIED ABOVE AS TO WHAT IS DIFFERNT

The data is coming through a cursor like this

connection = sql.connect(
    server_hostname="X",
    http_path="B",
    access_token=app_settings.dbx_access_token,
)

cursor = connection.cursor()

max_items = 100000
batch_size = 10000

cursor.execute(
    f"SELECT * from X where  creation_date between '2024-06-01' and '2024-09-01' limit {max_items}"
)

The source table is created through a CTAS statement, so all fields are nullable by default. I have found two ways to resolve the issue:

  • either set results = pyarrow.concat_tables([results, partial_results], promote_options="permissive") promote options to permissive to so pyarrow can marry two schemas, or
  • Downgrade to the latest previous major version 2.9.6

I checked the 2.9.6 source code and it does not seem to be using a permissive schema casting, so seems like a regression in this case.

I'm not sure if I can add anything else beyond that, but do let me know.

And to be clear, I request like 100k records at a time there, and can iterate through like 95k of them, and then it fails. So I'm not really sure if there is a reliable way to reproduce that

If the cluster runtime matters, 13.3 LTS (includes Apache Spark 3.4.1, Scala 2.12)

Thanks!

@ksofeikov ksofeikov changed the title RsultSet fetchmany_arrow/fetchall_arrow methods fail with during concat_tables RsultSet fetchmany_arrow/fetchall_arrow methods fail during concat_tables Jul 29, 2024
@kravets-levko
Copy link
Contributor

Hi @ksofeikov! Thank you for reporting this issue and attaching the stacktrace. Before we go deeper into fixing the code, I would like to understand - how did it ended up so two pieces of the same result set have different schema? Is it possible to create some synthetic example to reproduce this issue, so you don't have to reveal your data? It woul help us to check if promote_options="permissive" is a proper fix (or find the one if needed), also, check if other our libraries have similar bug

@kravets-levko kravets-levko changed the title RsultSet fetchmany_arrow/fetchall_arrow methods fail during concat_tables ResultSet fetchmany_arrow/fetchall_arrow methods fail during concat_tables Jul 30, 2024
@ksofeikov
Copy link
Author

ksofeikov commented Jul 30, 2024

@kravets-levko tbh, I'm not sure how to create a reproducible example here, since the cursor just bulk-reads the table from the store. Since this happens during the result read-out stage, it's not really something that touches the client code/controlled by me.

What I can do is maybe try to stop at a breakpoint and see how the cloud fetch gets a table with a different schema.

There is another pointer I could give, I guess. Let me know if I completely misunderstood it, but it seems like the CloudFetch would download literally files and then take results from there.

I remember seeing a similar pyarrow schema problem with dask when I downloaded files from AWS Athena and then tried to concatenate them through dd.read_parquet(...) and it would fail internally with the same thing. Back then I think what I figured was that when reading the parquet files, if a slice happens to have all nulls for a column then it's over. What seemingly helped me back then was forcing schema to be in a certain way.

However, memory might be letting me down here :)

@ksofeikov
Copy link
Author

I suspect one way to create an example is to craft two arrow files with one column each and then one will have no null values in a column, and the other one will have nulls there. then try to read those files as if they were the query result, this will probably trigger it.

@ksofeikov
Copy link
Author

Yeah, seems like the null assumption was correct. Here is a minimally working example

import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
df1 = pd.DataFrame({"a": ["a", "b"]}, dtype=str)
df1.to_parquet("1.parquet")
df2 = pd.DataFrame({"a": [None, None]}, dtype=str)
df2.to_parquet("2.parquet")
t1 = pq.read_table("1.parquet")
t2 = pq.read_table("2.parquet")

pa.concat_tables([t1,t2])

will throw

ArrowInvalid: Schema at index 1 was different: 
a: string
vs
a: null

@ksofeikov
Copy link
Author

ksofeikov commented Jul 30, 2024

Also, the root cause of this is that strings are backed by python str, which go as object dtype in pandas. Backing this by either nullable pandas string or string[pyarrow] does not lead to an exception and reads the data fine

import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
df1 = pd.DataFrame({"a": ["a", "b"]}, dtype="string")
df1.to_parquet("1.parquet")
df2 = pd.DataFrame({"a": [None, None]}, dtype="string")
df2.to_parquet("2.parquet")
t1 = pq.read_table("1.parquet")
t2 = pq.read_table("2.parquet")

pa.concat_tables([t1,t2])

generates

pyarrow.Table
a: string
----
a: [["a","b"],[null,null]]

@kravets-levko
Copy link
Contributor

kravets-levko commented Jul 30, 2024

@ksofeikov Yes, I totally understand what the error means. What I don't understand - why it happens. In your example, you literally created two different tables with different schemas. However, when you run SQL query - you get a single result set, all rows of which should have the same schema. Even if you UNION few tables - server should compute the common schema for the result. Even with CloudFetch it should be the same, because server prepends schema to each file before storing them to cloud, and it should use the same schema for all of them.

I have some suspects on where the schema may potentially get changed while reading the data from server, and I will try to reproduce your issue. Meanwhile, I want to ask you to check some other things:

  • try different runtime (newer) if possible
  • try to run the same query with Nodejs and/or Go drivers (also, if possible)
  • try to disable CloudFetch (pass use_cloud_fetch=False to sql.connect(...)) and see if anything changes

@ksofeikov
Copy link
Author

Just checked use_cloud_fetch=False - it does solve the problem. I also made sure to remove my patches allowing permissive schema merges. Works without cloud fetch, and starts failing again if I enable it.

@ksofeikov
Copy link
Author

ksofeikov commented Jul 30, 2024

Changing env from 13.3 LTS to 14.3 LTS and 15.4 LTS seemingly does nothing to help the problem - the code still fails, if cloud fetch is enabled.

Is there a way to verify the env version on the client, just to make sure it was picked up after changing the compute on the platform?

Regardless, there are a couple of suitable workarounds for now, so good luck with the fix!

@kravets-levko
Copy link
Contributor

It's good that workaround helped and you can continue using the library while we're looking for the fix. Too bad that we have another issue with CloudFetch 🤔 Anyway. Thank you for the bug report and all your help, which allowed to narrow down the scope of the issue (even though I still need to reproduce it 🙂). Will keep you posted if any updates or other questions

@unj1m
Copy link
Contributor

unj1m commented Sep 13, 2024

I just got bitten by this as well. FWIW, no pandas was involved in my use case. Just SQL.

It's good to know that disabling cloud fetch is a work-around.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants