Skip to content

Commit

Permalink
Update hive.py
Browse files Browse the repository at this point in the history
  • Loading branch information
acrylJonny committed Nov 13, 2024
1 parent 5f22cc6 commit a731406
Showing 1 changed file with 23 additions and 18 deletions.
41 changes: 23 additions & 18 deletions metadata-ingestion/src/datahub/ingestion/source/sql/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ class HiveStorageLineageConfig(BaseModel):
default=True,
description="When enabled, column-level lineage will be extracted from storage",
)
platform_instance: Optional[str] = Field(
storage_platform_instance: Optional[str] = Field(
default=None,
description="Platform instance for the storage system",
)
Expand All @@ -227,12 +227,6 @@ def _validate_direction(cls, direction: str) -> str:
)
return direction.lower()

@property
def storage_platform_instance(self) -> Optional[str]:
"""Get the storage platform instance, defaulting to same as Hive if not specified"""

return self.platform_instance


@dataclass
class HiveStorageSourceReport:
Expand All @@ -259,13 +253,10 @@ def __init__(
self,
config: HiveStorageLineageConfig,
env: str,
platform_instance: Optional[str] = None,
convert_urns_to_lowercase: bool = False,
):
self.config = config
self.env = env
if self.config.storage_platform_instance is None:
self.config.platform_instance = platform_instance
self.convert_urns_to_lowercase = convert_urns_to_lowercase
self.report = HiveStorageSourceReport()

Expand All @@ -292,6 +283,7 @@ def _make_storage_dataset_urn(
Returns tuple of (urn, platform) if successful, None otherwise.
"""

platform_instance = None
storage_info = StoragePathParser.parse_storage_location(storage_location)
if not storage_info:
logger.debug(f"Could not parse storage location: {storage_location}")
Expand All @@ -304,7 +296,7 @@ def _make_storage_dataset_urn(
platform_name = platform_name.lower()
path = path.lower()
if self.config.storage_platform_instance:
self.config.platform_instance = (
platform_instance = (
self.config.storage_platform_instance.lower()
)

Expand All @@ -313,7 +305,7 @@ def _make_storage_dataset_urn(
platform=platform_name,
name=path,
env=self.env,
platform_instance=self.config.storage_platform_instance,
platform_instance=platform_instance,
)
return storage_urn, platform_name
except Exception as exp:
Expand Down Expand Up @@ -360,7 +352,9 @@ def normalize_field_path(field_path: str) -> str:
upstreams=[
make_schema_field_urn(
parent_urn=storage_urn,
field_path=normalize_field_path(matching_field.fieldPath),
field_path=normalize_field_path(
matching_field.fieldPath
),
)
],
downstreamType=FineGrainedLineageDownstreamTypeClass.FIELD,
Expand All @@ -386,7 +380,9 @@ def normalize_field_path(field_path: str) -> str:
downstreams=[
make_schema_field_urn(
parent_urn=storage_urn,
field_path=normalize_field_path(matching_field.fieldPath),
field_path=normalize_field_path(
matching_field.fieldPath
),
)
],
)
Expand Down Expand Up @@ -424,6 +420,7 @@ def get_storage_dataset_mcp(
This creates the storage dataset entity in DataHub.
"""

platform_instance = None
storage_info = StoragePathParser.parse_storage_location(
storage_location,
)
Expand All @@ -436,8 +433,10 @@ def get_storage_dataset_mcp(
if self.convert_urns_to_lowercase:
platform_name = platform_name.lower()
path = path.lower()
if platform_instance:
platform_instance = platform_instance.lower()
if self.config.storage_platform_instance:
platform_instance = (
self.config.storage_platform_instance.lower()
)

try:
storage_urn = make_dataset_urn_with_platform_instance(
Expand Down Expand Up @@ -512,6 +511,8 @@ def get_lineage_mcp(
MetadataWorkUnit containing the lineage MCP if successful
"""

platform_instance = None

if not self.config.enabled:
return None

Expand All @@ -529,12 +530,17 @@ def get_lineage_mcp(
storage_urn, storage_platform = storage_info
self.report.report_location_scanned()

if self.config.storage_platform_instance:
platform_instance = (
self.config.storage_platform_instance.lower()
)

workunits = []

# Create storage dataset entity
storage_mcps = self.get_storage_dataset_mcp(
storage_location=storage_location,
platform_instance=self.config.storage_platform_instance,
platform_instance=platform_instance,
schema_metadata=dataset_schema,
)
if storage_mcps:
Expand Down Expand Up @@ -731,7 +737,6 @@ def __init__(self, config, ctx):
self.storage_lineage = HiveStorageLineage(
config=config.storage_lineage,
env=config.env,
platform_instance=config.platform_instance,
convert_urns_to_lowercase=config.convert_urns_to_lowercase,
)

Expand Down

0 comments on commit a731406

Please sign in to comment.