Skip to content

Commit

Permalink
Support multi-schemas for refresh endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
jcjc712 committed Apr 19, 2024
1 parent 27a5bfc commit cf1b27a
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 27 deletions.
24 changes: 18 additions & 6 deletions dataherald/api/fastapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,18 +195,30 @@ def refresh_table_description(
db_connection = db_connection_repository.find_by_id(
refresh_table_description.db_connection_id
)

scanner = self.system.instance(Scanner)
database_connection_service = DatabaseConnectionService(scanner, self.storage)
try:
sql_database = SQLDatabase.get_sql_engine(db_connection, True)
tables = sql_database.get_tables_and_views()
data = {}
if db_connection.schemas:
for schema in db_connection.schemas:
sql_database = database_connection_service.get_sql_database(
db_connection, schema
)
if schema not in data.keys():
data[schema] = []
data[schema] = sql_database.get_tables_and_views()
else:
sql_database = database_connection_service.get_sql_database(
db_connection
)
data[None] = sql_database.get_tables_and_views()

# Get tables and views and create missing table-descriptions as NOT_SCANNED and update DEPRECATED
scanner_repository = TableDescriptionRepository(self.storage)
scanner = self.system.instance(Scanner)

return [
TableDescriptionResponse(**record.dict())
for record in scanner.refresh_tables(
tables, str(db_connection.id), scanner_repository
data, str(db_connection.id), scanner_repository
)
]
except Exception as e:
Expand Down
2 changes: 1 addition & 1 deletion dataherald/db_scanner/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def create_tables(
@abstractmethod
def refresh_tables(
self,
tables: list[str],
schemas_and_tables: dict[str, list],
db_connection_id: str,
repository: TableDescriptionRepository,
metadata: dict = None,
Expand Down
44 changes: 24 additions & 20 deletions dataherald/db_scanner/sqlalchemy.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,34 +62,38 @@ def create_tables(
@override
def refresh_tables(
self,
tables: list[str],
schemas_and_tables: dict[str, list],
db_connection_id: str,
repository: TableDescriptionRepository,
metadata: dict = None,
) -> list[TableDescription]:
stored_tables = repository.find_by({"db_connection_id": str(db_connection_id)})
stored_tables_list = [table.table_name for table in stored_tables]

rows = []
for table_description in stored_tables:
if table_description.table_name not in tables:
table_description.status = TableDescriptionStatus.DEPRECATED.value
rows.append(repository.save_table_info(table_description))
else:
rows.append(TableDescription(**table_description.dict()))
for schema, tables in schemas_and_tables.items():
stored_tables = repository.find_by(
{"db_connection_id": str(db_connection_id), "schema": schema}
)
stored_tables_list = [table.table_name for table in stored_tables]

for table in tables:
if table not in stored_tables_list:
rows.append(
repository.save_table_info(
TableDescription(
db_connection_id=db_connection_id,
table_name=table,
status=TableDescriptionStatus.NOT_SCANNED.value,
metadata=metadata,
for table_description in stored_tables:
if table_description.table_name not in tables:
table_description.status = TableDescriptionStatus.DEPRECATED.value
rows.append(repository.save_table_info(table_description))
else:
rows.append(TableDescription(**table_description.dict()))

for table in tables:
if table not in stored_tables_list:
rows.append(
repository.save_table_info(
TableDescription(
db_connection_id=db_connection_id,
table_name=table,
status=TableDescriptionStatus.NOT_SCANNED.value,
metadata=metadata,
schema_name=schema,
)
)
)
)
return rows

@override
Expand Down

0 comments on commit cf1b27a

Please sign in to comment.