Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat/databricks delta table destination connector #318

Open
wants to merge 8 commits into
base: main
Choose a base branch
from

Conversation

rbiseck3
Copy link
Collaborator

@rbiseck3 rbiseck3 commented Dec 20, 2024

Description

Adds in support for writing to databricks delta tables in two different ways:

  • Adds in Databricks delta table destination connector, building off of the existing sql foundation.
  • Adds support for supplemental table migration in the databricks volumes connector

@rbiseck3 rbiseck3 changed the title feat/databricks delta table feat/databricks delta table destination connector Dec 20, 2024
@micmarty-deepsense
Copy link
Contributor

It seems that destination entry in unstructured_ingest/v2/processes/connectors/sql/__init__.py is missing

@@ -406,7 +422,7 @@ def upload_dataframe(self, df: pd.DataFrame, file_data: FileData) -> None:
cursor.executemany(stmt, values)

def get_table_columns(self) -> list[str]:
with self.connection_config.get_cursor() as cursor:
with self.get_cursor() as cursor:
cursor.execute(f"SELECT * from {self.upload_config.table_name}")
Copy link
Contributor

@micmarty-deepsense micmarty-deepsense Jan 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this line hasn't changed, but I think it's worth double-checking to ensure it's the solution we want to keep. Using SELECT * doesn't seem optimal for just fetching column descriptions (I might be wrong). Perhaps we could opt for something like this:

cursor.execute("DESCRIBE `some_table_name`")

which produces output like this:

[Row(col_name='id', data_type='string', comment=None),
 Row(col_name='record_id', data_type='string', comment=None),
 Row(col_name='element_id', data_type='string', comment=None),
 Row(col_name='text', data_type='string', comment=None),
 Row(col_name='embeddings', data_type='array<float>', comment=None),
 Row(col_name='type', data_type='string', comment=None),
 Row(col_name='url', data_type='string', comment=None),
 Row(col_name='version', data_type='string', comment=None),
 Row(col_name='data_source_date_created', data_type='timestamp', comment=None),
 Row(col_name='data_source_date_modified', data_type='timestamp', comment=None),
 Row(col_name='data_source_date_processed', data_type='timestamp', comment=None),
 Row(col_name='data_source_permissions_data', data_type='string', comment=None),
 Row(col_name='data_source_filesize_bytes', data_type='float', comment=None),
 Row(col_name='data_source_url', data_type='string', comment=None),
 Row(col_name='data_source_version', data_type='string', comment=None),
 Row(col_name='data_source_record_locator', data_type='string', comment=None),
 Row(col_name='category_depth', data_type='int', comment=None),
 Row(col_name='parent_id', data_type='string', comment=None),
 Row(col_name='attached_filename', data_type='string', comment=None),
 Row(col_name='filetype', data_type='string', comment=None),
 Row(col_name='last_modified', data_type='timestamp', comment=None),
 Row(col_name='file_directory', data_type='string', comment=None),
 Row(col_name='filename', data_type='string', comment=None),
 Row(col_name='languages', data_type='array<string>', comment=None),
 Row(col_name='page_number', data_type='string', comment=None),
 Row(col_name='links', data_type='string', comment=None),
 Row(col_name='page_name', data_type='string', comment=None),
 Row(col_name='link_urls', data_type='string', comment=None),
 Row(col_name='link_texts', data_type='string', comment=None),
 Row(col_name='sent_from', data_type='string', comment=None),
 Row(col_name='sent_to', data_type='string', comment=None),
 Row(col_name='subject', data_type='string', comment=None),
 Row(col_name='section', data_type='string', comment=None),
 Row(col_name='header_footer_type', data_type='string', comment=None),
 Row(col_name='emphasized_text_contents', data_type='string', comment=None),
 Row(col_name='emphasized_text_tags', data_type='string', comment=None),
 Row(col_name='text_as_html', data_type='string', comment=None),
 Row(col_name='regex_metadata', data_type='string', comment=None),
 Row(col_name='detection_class_prob', data_type='float', comment=None),
 Row(col_name='is_continuation', data_type='boolean', comment=None),
 Row(col_name='orig_elements', data_type='string', comment=None),
 Row(col_name='coordinates_points', data_type='string', comment=None),
 Row(col_name='coordinates_system', data_type='string', comment=None),
 Row(col_name='coordinates_layout_width', data_type='float', comment=None),
 Row(col_name='coordinates_layout_height', data_type='float', comment=None)]

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't actually know when this was introduced and I'm more than happy to refactor to something more optimal to fetch column data.

last_modified TIMESTAMP,
file_directory STRING,
filename STRING,
languages ARRAY<STRING>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

arrays are not correctly inserted; prepare_data converts lists into strings.

[upload] [DATATYPE_MISMATCH.CAST_WITHOUT_SUGGESTION] Cannot resolve "languages" due to data type mismatch: cannot cast "STRING" to "ARRAY<STRING>". SQLSTATE: 42K09; line 1 pos 0

The solution is to either change the SQL statement for insertion so that we can use ARRAY as a keyword, or we change the type to STRING (which worked for me)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately this SDK seems to not support array parameters:
image

I don't see any other way that would be safe (not prone to SQL injection) and would allow for having cusor.execute and list of parameters at the same time.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ya I've reached out to a contact at Databricks. Hopefully we'll hear a work around. Otherwise this entire approach might need to be redone.

@@ -129,8 +129,13 @@ class SQLIndexer(Indexer, ABC):
connection_config: SQLConnectionConfig
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(comment not tied to any particular line, but relevant to this file)
I was getting "column does not exist" type of errors, because the schema expects a bunch of columns to have the data_source_ prefix.

We'd need to adjust column names here and there, I guess
image

@rbiseck3
Copy link
Collaborator Author

rbiseck3 commented Jan 2, 2025

@micmarty-deepsense this PR is still in the works as we wait to hear back from databricks on getting a solution for variable binding and using list data.

@micmarty-deepsense
Copy link
Contributor

micmarty-deepsense commented Jan 3, 2025

I've also run into errors saying that the Timestamp is not JSON serializable. To fix it, change this line to:

json.dump(data, f, indent=2, default=str)

... but I believe it's not a good place to fix this 😛

Here's my temporary dev branch that attempts to naively address some of the obstacles I faced when testing this PR

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants