Skip to content

Commit

Permalink
feat(ingestion/tableau): support column level lineage for custom sql (#…
Browse files Browse the repository at this point in the history
…8466)

Co-authored-by: MohdSiddiqueBagwan <[email protected]>
  • Loading branch information
2 people authored and yoonhyejin committed Aug 24, 2023
1 parent 22ed350 commit 4bdc9bf
Show file tree
Hide file tree
Showing 12 changed files with 44,478 additions and 95 deletions.
274 changes: 220 additions & 54 deletions metadata-ingestion/src/datahub/ingestion/source/tableau.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,24 @@
from dataclasses import dataclass
from datetime import datetime
from functools import lru_cache
from typing import Any, Dict, Iterable, List, Optional, Set, Tuple, Union, cast
from typing import (
Any,
Callable,
Dict,
Iterable,
List,
Optional,
Set,
Tuple,
Union,
cast,
)

import dateutil.parser as dp
import tableauserverclient as TSC
from pydantic import root_validator, validator
from pydantic.fields import Field
from requests.adapters import ConnectionError
from sqllineage.runner import LineageRunner
from tableauserverclient import (
PersonalAccessTokenAuth,
Server,
Expand Down Expand Up @@ -71,8 +81,11 @@
dashboard_graphql_query,
database_tables_graphql_query,
embedded_datasource_graphql_query,
get_overridden_info,
get_unique_custom_sql,
make_fine_grained_lineage_class,
make_table_urn,
make_upstream_class,
published_datasource_graphql_query,
query_metadata,
sheet_graphql_query,
Expand Down Expand Up @@ -120,10 +133,15 @@
OwnershipClass,
OwnershipTypeClass,
SubTypesClass,
UpstreamClass,
ViewPropertiesClass,
)
from datahub.utilities import config_clean
from datahub.utilities.sqlglot_lineage import (
ColumnLineageInfo,
SchemaResolver,
SqlParsingResult,
sqlglot_lineage,
)

logger: logging.Logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -870,14 +888,14 @@ def _create_upstream_table_lineage(
f"A total of {len(upstream_tables)} upstream table edges found for datasource {datasource[tableau_constant.ID]}"
)

if datasource.get(tableau_constant.FIELDS):
datasource_urn = builder.make_dataset_urn_with_platform_instance(
platform=self.platform,
name=datasource[tableau_constant.ID],
platform_instance=self.config.platform_instance,
env=self.config.env,
)
datasource_urn = builder.make_dataset_urn_with_platform_instance(
platform=self.platform,
name=datasource[tableau_constant.ID],
platform_instance=self.config.platform_instance,
env=self.config.env,
)

if datasource.get(tableau_constant.FIELDS):
if self.config.extract_column_level_lineage:
# Find fine grained lineage for datasource column to datasource column edge,
# upstream columns may be from same datasource
Expand Down Expand Up @@ -1140,6 +1158,57 @@ def get_upstream_fields_of_field_in_datasource(self, datasource, datasource_urn)
)
return fine_grained_lineages

def get_upstream_fields_from_custom_sql(
self, datasource: dict, datasource_urn: str
) -> List[FineGrainedLineage]:
fine_grained_lineages: List[FineGrainedLineage] = []

parsed_result = self.parse_custom_sql(
datasource=datasource,
datasource_urn=datasource_urn,
env=self.config.env,
platform=self.platform,
platform_instance=self.config.platform_instance,
func_overridden_info=None, # Here we don't want to override any information from configuration
)

if parsed_result is None:
logger.info(
f"Failed to extract column level lineage from datasource {datasource_urn}"
)
return fine_grained_lineages

cll: List[ColumnLineageInfo] = (
parsed_result.column_lineage
if parsed_result.column_lineage is not None
else []
)
for cll_info in cll:
downstream = (
[
builder.make_schema_field_urn(
datasource_urn, cll_info.downstream.column
)
]
if cll_info.downstream is not None
and cll_info.downstream.column is not None
else []
)
upstreams = [
builder.make_schema_field_urn(column_ref.table, column_ref.column)
for column_ref in cll_info.upstreams
]
fine_grained_lineages.append(
FineGrainedLineage(
downstreamType=FineGrainedLineageDownstreamType.FIELD,
downstreams=downstream,
upstreamType=FineGrainedLineageUpstreamType.FIELD_SET,
upstreams=upstreams,
)
)

return fine_grained_lineages

def get_transform_operation(self, field):
field_type = field[tableau_constant.TYPE_NAME]
if field_type in (
Expand Down Expand Up @@ -1176,6 +1245,7 @@ def emit_custom_sql_datasources(self) -> Iterable[MetadataWorkUnit]:
platform_instance=self.config.platform_instance,
env=self.config.env,
)

dataset_snapshot = DatasetSnapshot(
urn=csql_urn,
aspects=[self.get_data_platform_instance()],
Expand Down Expand Up @@ -1225,14 +1295,20 @@ def emit_custom_sql_datasources(self) -> Iterable[MetadataWorkUnit]:
csql_urn, tables, datasource
)
elif self.config.extract_lineage_from_unsupported_custom_sql_queries:
logger.debug("Extracting TLL & CLL from custom sql")
# custom sql tables may contain unsupported sql, causing incomplete lineage
# we extract the lineage from the raw queries
yield from self._create_lineage_from_unsupported_csql(
csql_urn, csql
)

# Schema Metadata
columns = csql.get(tableau_constant.COLUMNS, [])
# if condition is needed as graphQL return "cloumns": None
columns: List[Dict[Any, Any]] = (
cast(List[Dict[Any, Any]], csql.get(tableau_constant.COLUMNS))
if tableau_constant.COLUMNS in csql
and csql.get(tableau_constant.COLUMNS) is not None
else []
)
schema_metadata = self.get_schema_metadata_for_custom_sql(columns)
if schema_metadata is not None:
dataset_snapshot.aspects.append(schema_metadata)
Expand Down Expand Up @@ -1450,53 +1526,143 @@ def _create_lineage_to_upstream_tables(
aspect=upstream_lineage,
)

def _create_lineage_from_unsupported_csql(
self, csql_urn: str, csql: dict
) -> Iterable[MetadataWorkUnit]:
database = csql.get(tableau_constant.DATABASE) or {}
def parse_custom_sql(
self,
datasource: dict,
datasource_urn: str,
platform: str,
env: str,
platform_instance: Optional[str],
func_overridden_info: Optional[
Callable[
[
str,
Optional[str],
Optional[Dict[str, str]],
Optional[TableauLineageOverrides],
],
Tuple[Optional[str], Optional[str], str, str],
]
],
) -> Optional["SqlParsingResult"]:

database_info = datasource.get(tableau_constant.DATABASE) or {}

if datasource.get(tableau_constant.IS_UNSUPPORTED_CUSTOM_SQL) in (None, False):
logger.debug(f"datasource {datasource_urn} is not created from custom sql")
return None

if (
csql.get(tableau_constant.IS_UNSUPPORTED_CUSTOM_SQL, False)
and tableau_constant.NAME in database
and tableau_constant.CONNECTION_TYPE in database
tableau_constant.NAME not in database_info
or tableau_constant.CONNECTION_TYPE not in database_info
):
upstream_tables = []
query = csql.get(tableau_constant.QUERY)
parser = LineageRunner(query)

try:
for table in parser.source_tables:
split_table = str(table).split(".")
if len(split_table) == 2:
datset = make_table_urn(
env=self.config.env,
upstream_db=database.get(tableau_constant.NAME),
connection_type=database.get(
tableau_constant.CONNECTION_TYPE, ""
),
schema=split_table[0],
full_name=split_table[1],
platform_instance_map=self.config.platform_instance_map,
lineage_overrides=self.config.lineage_overrides,
)
upstream_tables.append(
UpstreamClass(
type=DatasetLineageType.TRANSFORMED, dataset=datset
)
)
except Exception as e:
self.report.report_warning(
key="csql-lineage",
reason=f"Unable to retrieve lineage from query. "
f"Query: {query} "
f"Reason: {str(e)} ",
logger.debug(
f"database information is missing from datasource {datasource_urn}"
)
return None

query = datasource.get(tableau_constant.QUERY)
if query is None:
logger.debug(
f"raw sql query is not available for datasource {datasource_urn}"
)
return None

logger.debug(f"Parsing sql={query}")

upstream_db = database_info.get(tableau_constant.NAME)

if func_overridden_info is not None:
# Override the information as per configuration
upstream_db, platform_instance, platform, _ = func_overridden_info(
database_info[tableau_constant.CONNECTION_TYPE],
database_info.get(tableau_constant.NAME),
self.config.platform_instance_map,
self.config.lineage_overrides,
)

logger.debug(
f"Overridden info upstream_db={upstream_db}, platform_instance={platform_instance}, platform={platform}"
)

parsed_result: Optional["SqlParsingResult"] = None
try:
schema_resolver = (
self.ctx.graph._make_schema_resolver(
platform=platform,
platform_instance=platform_instance,
env=env,
)
if self.ctx.graph is not None
else SchemaResolver(
platform=platform,
platform_instance=platform_instance,
env=env,
graph=None,
)
upstream_lineage = UpstreamLineage(upstreams=upstream_tables)
yield self.get_metadata_change_proposal(
csql_urn,
aspect_name=tableau_constant.UPSTREAM_LINEAGE,
aspect=upstream_lineage,
)

if schema_resolver.graph is None:
logger.warning(
"Column Level Lineage extraction would not work as DataHub graph client is None."
)

parsed_result = sqlglot_lineage(
query,
schema_resolver=schema_resolver,
default_db=upstream_db,
)
except Exception as e:
self.report.report_warning(
key="csql-lineage",
reason=f"Unable to retrieve lineage from query. "
f"Query: {query} "
f"Reason: {str(e)} ",
)

return parsed_result

def _create_lineage_from_unsupported_csql(
self, csql_urn: str, csql: dict
) -> Iterable[MetadataWorkUnit]:

parsed_result = self.parse_custom_sql(
datasource=csql,
datasource_urn=csql_urn,
env=self.config.env,
platform=self.platform,
platform_instance=self.config.platform_instance,
func_overridden_info=get_overridden_info,
)

if parsed_result is None:
logger.info(
f"Failed to extract table level lineage for datasource {csql_urn}"
)
return

upstream_tables = make_upstream_class(parsed_result)

logger.debug(f"Upstream tables = {upstream_tables}")

fine_grained_lineages: List[FineGrainedLineage] = []
if self.config.extract_column_level_lineage:
logger.info("Extracting CLL from custom sql")
fine_grained_lineages = make_fine_grained_lineage_class(
parsed_result, csql_urn
)

upstream_lineage = UpstreamLineage(
upstreams=upstream_tables,
fineGrainedLineages=fine_grained_lineages,
)

yield self.get_metadata_change_proposal(
csql_urn,
aspect_name=tableau_constant.UPSTREAM_LINEAGE,
aspect=upstream_lineage,
)

def _get_schema_metadata_for_datasource(
self, datasource_fields: List[dict]
) -> Optional[SchemaMetadata]:
Expand Down
Loading

0 comments on commit 4bdc9bf

Please sign in to comment.