diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/hive.py b/metadata-ingestion/src/datahub/ingestion/source/sql/hive.py index 0c6c80a10b596..021b9904d28f2 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/hive.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/hive.py @@ -208,7 +208,7 @@ class HiveStorageLineageConfig(BaseModel): default=True, description="When enabled, column-level lineage will be extracted from storage", ) - platform_instance: Optional[str] = Field( + storage_platform_instance: Optional[str] = Field( default=None, description="Platform instance for the storage system", ) @@ -227,12 +227,6 @@ def _validate_direction(cls, direction: str) -> str: ) return direction.lower() - @property - def storage_platform_instance(self) -> Optional[str]: - """Get the storage platform instance, defaulting to same as Hive if not specified""" - - return self.platform_instance - @dataclass class HiveStorageSourceReport: @@ -259,13 +253,10 @@ def __init__( self, config: HiveStorageLineageConfig, env: str, - platform_instance: Optional[str] = None, convert_urns_to_lowercase: bool = False, ): self.config = config self.env = env - if self.config.storage_platform_instance is None: - self.config.platform_instance = platform_instance self.convert_urns_to_lowercase = convert_urns_to_lowercase self.report = HiveStorageSourceReport() @@ -292,6 +283,7 @@ def _make_storage_dataset_urn( Returns tuple of (urn, platform) if successful, None otherwise. """ + platform_instance = None storage_info = StoragePathParser.parse_storage_location(storage_location) if not storage_info: logger.debug(f"Could not parse storage location: {storage_location}") @@ -304,7 +296,7 @@ def _make_storage_dataset_urn( platform_name = platform_name.lower() path = path.lower() if self.config.storage_platform_instance: - self.config.platform_instance = ( + platform_instance = ( self.config.storage_platform_instance.lower() ) @@ -313,7 +305,7 @@ def _make_storage_dataset_urn( platform=platform_name, name=path, env=self.env, - platform_instance=self.config.storage_platform_instance, + platform_instance=platform_instance, ) return storage_urn, platform_name except Exception as exp: @@ -360,7 +352,9 @@ def normalize_field_path(field_path: str) -> str: upstreams=[ make_schema_field_urn( parent_urn=storage_urn, - field_path=normalize_field_path(matching_field.fieldPath), + field_path=normalize_field_path( + matching_field.fieldPath + ), ) ], downstreamType=FineGrainedLineageDownstreamTypeClass.FIELD, @@ -386,7 +380,9 @@ def normalize_field_path(field_path: str) -> str: downstreams=[ make_schema_field_urn( parent_urn=storage_urn, - field_path=normalize_field_path(matching_field.fieldPath), + field_path=normalize_field_path( + matching_field.fieldPath + ), ) ], ) @@ -424,6 +420,7 @@ def get_storage_dataset_mcp( This creates the storage dataset entity in DataHub. """ + platform_instance = None storage_info = StoragePathParser.parse_storage_location( storage_location, ) @@ -436,8 +433,10 @@ def get_storage_dataset_mcp( if self.convert_urns_to_lowercase: platform_name = platform_name.lower() path = path.lower() - if platform_instance: - platform_instance = platform_instance.lower() + if self.config.storage_platform_instance: + platform_instance = ( + self.config.storage_platform_instance.lower() + ) try: storage_urn = make_dataset_urn_with_platform_instance( @@ -512,6 +511,8 @@ def get_lineage_mcp( MetadataWorkUnit containing the lineage MCP if successful """ + platform_instance = None + if not self.config.enabled: return None @@ -529,12 +530,17 @@ def get_lineage_mcp( storage_urn, storage_platform = storage_info self.report.report_location_scanned() + if self.config.storage_platform_instance: + platform_instance = ( + self.config.storage_platform_instance.lower() + ) + workunits = [] # Create storage dataset entity storage_mcps = self.get_storage_dataset_mcp( storage_location=storage_location, - platform_instance=self.config.storage_platform_instance, + platform_instance=platform_instance, schema_metadata=dataset_schema, ) if storage_mcps: @@ -731,7 +737,6 @@ def __init__(self, config, ctx): self.storage_lineage = HiveStorageLineage( config=config.storage_lineage, env=config.env, - platform_instance=config.platform_instance, convert_urns_to_lowercase=config.convert_urns_to_lowercase, )