Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingestion/oracle): Improved foreign key handling #11867

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 66 additions & 58 deletions metadata-ingestion/src/datahub/ingestion/source/sql/oracle.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,82 +376,90 @@ def _get_constraint_data(
) -> List[sqlalchemy.engine.Row]:
params = {"table_name": table_name}

# Simplified query that's more reliable with SQLAlchemy 1.4
text = (
"SELECT"
"\nac.constraint_name," # 0
"\nac.constraint_type," # 1
"\nloc.column_name AS local_column," # 2
"\nrem.table_name AS remote_table," # 3
"\nrem.column_name AS remote_column," # 4
"\nrem.owner AS remote_owner," # 5
"\nloc.position as loc_pos," # 6
"\nrem.position as rem_pos," # 7
"\nac.search_condition," # 8
"\nac.delete_rule" # 9
"\nFROM dba_constraints%(dblink)s ac,"
"\ndba_cons_columns%(dblink)s loc,"
"\ndba_cons_columns%(dblink)s rem"
"\nWHERE ac.table_name = CAST(:table_name AS VARCHAR2(128))"
"\nac.constraint_name,"
"\nac.constraint_type,"
"\nacc.column_name AS local_column,"
"\nNULL AS remote_table,"
"\nNULL AS remote_column,"
"\nNULL AS remote_owner,"
"\nacc.position AS loc_pos,"
"\nNULL AS rem_pos,"
"\nac.search_condition,"
"\nac.delete_rule"
"\nFROM all_constraints ac"
"\nJOIN all_cons_columns acc"
"\nON ac.owner = acc.owner"
"\nAND ac.constraint_name = acc.constraint_name"
"\nAND ac.table_name = acc.table_name"
"\nWHERE ac.table_name = :table_name"
"\nAND ac.constraint_type IN ('R','P', 'U', 'C')"
)

if schema is not None:
params["owner"] = schema
text += "\nAND ac.owner = CAST(:owner AS VARCHAR2(128))"
text += "\nAND ac.owner = :owner"

# For foreign keys, join with the remote columns
text += (
"\nAND ac.owner = loc.owner"
"\nAND ac.constraint_name = loc.constraint_name"
"\nAND ac.r_owner = rem.owner(+)"
"\nAND ac.r_constraint_name = rem.constraint_name(+)"
"\nAND (rem.position IS NULL or loc.position=rem.position)"
"\nORDER BY ac.constraint_name, loc.position"
"\nUNION ALL"
"\nSELECT"
"\nac.constraint_name,"
"\nac.constraint_type,"
"\nacc.column_name AS local_column,"
"\nac.r_table_name AS remote_table,"
"\nrcc.column_name AS remote_column,"
"\nac.r_owner AS remote_owner,"
"\nacc.position AS loc_pos,"
"\nrcc.position AS rem_pos,"
"\nac.search_condition,"
"\nac.delete_rule"
"\nFROM all_constraints ac"
"\nJOIN all_cons_columns acc"
"\nON ac.owner = acc.owner"
"\nAND ac.constraint_name = acc.constraint_name"
"\nAND ac.table_name = acc.table_name"
"\nLEFT JOIN all_cons_columns rcc"
"\nON ac.r_owner = rcc.owner"
"\nAND ac.r_constraint_name = rcc.constraint_name"
"\nAND acc.position = rcc.position"
"\nWHERE ac.table_name = :table_name"
"\nAND ac.constraint_type = 'R'"
)

text = text % {"dblink": dblink}
if schema is not None:
text += "\nAND ac.owner = CAST(:owner AS VARCHAR2(128))"

text += "\nORDER BY constraint_name, loc_pos"

rp = self._inspector_instance.bind.execute(sql.text(text), params)
constraint_data = rp.fetchall()
return constraint_data
return list(rp.fetchall())

def get_pk_constraint(
self, table_name: str, schema: Optional[str] = None, dblink: str = ""
) -> Dict:

denormalized_table_name = self._inspector_instance.dialect.denormalize_name(
table_name
)
assert denormalized_table_name

schema = self._inspector_instance.dialect.denormalize_name(
schema or self.default_schema_name
)

if schema is None:
schema = self._inspector_instance.dialect.default_schema_name

pkeys = []
constraint_name = None
constraint_data = self._get_constraint_data(
denormalized_table_name, schema, dblink
)

for row in constraint_data:
(
cons_name,
cons_type,
local_column,
remote_table,
remote_column,
remote_owner,
) = row[0:2] + tuple(
[self._inspector_instance.dialect.normalize_name(x) for x in row[2:6]]
try:
for row in self._get_constraint_data(table_name, schema, dblink):
if row[1] == "P": # constraint_type is 'P' for primary key
if constraint_name is None:
constraint_name = (
self._inspector_instance.dialect.normalize_name(row[0])
)
col_name = self._inspector_instance.dialect.normalize_name(
row[2]
) # local_column
pkeys.append(col_name)
except Exception as e:
logger.error(
f"Error processing PK constraint data for {schema}.{table_name}: {str(e)}"
)
if cons_type == "P":
if constraint_name is None:
constraint_name = self._inspector_instance.dialect.normalize_name(
cons_name
)
pkeys.append(local_column)
# Return empty constraint if we can't process it
return {"constrained_columns": [], "name": None}

return {"constrained_columns": pkeys, "name": constraint_name}

Expand Down Expand Up @@ -557,8 +565,8 @@ def get_view_definition(
text = "SELECT text FROM dba_views WHERE view_name=:view_name"

if schema is not None:
text += " AND owner = :schema"
params["schema"] = schema
params["owner"] = schema
text += "\nAND ac.owner = CAST(:owner AS VARCHAR2(128))"

rp = self._inspector_instance.bind.execute(sql.text(text), params).scalar()

Expand Down
Loading