Skip to content

Commit

Permalink
Update hive.py
Browse files Browse the repository at this point in the history
  • Loading branch information
acrylJonny committed Nov 13, 2024
1 parent f478463 commit f305ea0
Showing 1 changed file with 30 additions and 4 deletions.
34 changes: 30 additions & 4 deletions metadata-ingestion/src/datahub/ingestion/source/sql/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
make_data_platform_urn,
make_dataplatform_instance_urn,
make_dataset_urn_with_platform_instance,
make_schema_field_urn,
)
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.decorators import (
Expand Down Expand Up @@ -357,20 +358,36 @@ def normalize_field_path(field_path: str) -> str:
FineGrainedLineageClass(
upstreamType=FineGrainedLineageUpstreamTypeClass.FIELD_SET,
upstreams=[
f"{storage_urn}.{normalize_field_path(matching_field.fieldPath)}"
make_schema_field_urn(
parent_urn=storage_urn,
field_path=normalize_field_path(matching_field.fieldPath),
)
],
downstreamType=FineGrainedLineageDownstreamTypeClass.FIELD,
downstreams=[f"{dataset_urn}.{dataset_path}"],
downstreams=[
make_schema_field_urn(
parent_urn=dataset_urn,
field_path=dataset_path,
)
],
)
)
else:
fine_grained_lineages.append(
FineGrainedLineageClass(
upstreamType=FineGrainedLineageUpstreamTypeClass.FIELD_SET,
upstreams=[f"{dataset_urn}.{dataset_path}"],
upstreams=[
make_schema_field_urn(
parent_urn=dataset_urn,
field_path=dataset_path,
)
],
downstreamType=FineGrainedLineageDownstreamTypeClass.FIELD,
downstreams=[
f"{storage_urn}.{normalize_field_path(matching_field.fieldPath)}"
make_schema_field_urn(
parent_urn=storage_urn,
field_path=normalize_field_path(matching_field.fieldPath),
)
],
)
)
Expand Down Expand Up @@ -432,6 +449,15 @@ def get_storage_dataset_mcp(

mcps = []

props = DatasetPropertiesClass(name=path)

mcps.append(
MetadataChangeProposalWrapper(
entityUrn=storage_urn,
aspect=props,
)
)

# Add platform instance
platform_instance_aspect = self._make_dataset_platform_instance(
platform=platform_name,
Expand Down

0 comments on commit f305ea0

Please sign in to comment.