diff --git a/metadata-ingestion/src/datahub/ingestion/source/cassandra/cassandra.py b/metadata-ingestion/src/datahub/ingestion/source/cassandra/cassandra.py index 95f1c892c905c..eb7741c82929b 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/cassandra/cassandra.py +++ b/metadata-ingestion/src/datahub/ingestion/source/cassandra/cassandra.py @@ -4,7 +4,6 @@ from dataclasses import dataclass, field from typing import Any, Dict, Iterable, List, Optional -from datahub.api.entities.dataset.dataset import Dataset from datahub.emitter.mce_builder import ( make_data_platform_urn, make_dataplatform_instance_urn, @@ -34,7 +33,6 @@ from datahub.ingestion.source.cassandra.cassandra_utils import ( COL_NAMES, SYSTEM_KEYSPACE_LIST, - VERSION, CassandraToSchemaFieldConverter, ) from datahub.ingestion.source.common.subtypes import ( @@ -497,16 +495,13 @@ def get_upstream_fields_of_field_in_datasource( fine_grained_lineages = [] for column_info in column_infos: source_column = column_info.column_name - column_type = column_info.type if source_column: - field_path = f"{VERSION}.[type={column_type}].{source_column}" - field_path_v1 = Dataset._simplify_field_path(field_path) fine_grained_lineages.append( FineGrainedLineageClass( upstreamType=FineGrainedLineageUpstreamTypeClass.FIELD_SET, downstreamType=FineGrainedLineageDownstreamTypeClass.FIELD, - downstreams=[make_schema_field_urn(dataset_urn, field_path_v1)], - upstreams=[make_schema_field_urn(upstream_urn, field_path_v1)], + downstreams=[make_schema_field_urn(dataset_urn, source_column)], + upstreams=[make_schema_field_urn(upstream_urn, source_column)], ) ) return fine_grained_lineages diff --git a/metadata-ingestion/src/datahub/ingestion/source/cassandra/cassandra_utils.py b/metadata-ingestion/src/datahub/ingestion/source/cassandra/cassandra_utils.py index d56044f48a78d..c2331e7627881 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/cassandra/cassandra_utils.py +++ b/metadata-ingestion/src/datahub/ingestion/source/cassandra/cassandra_utils.py @@ -1,4 +1,3 @@ -import json import logging from typing import Dict, Generator, List, Optional, Type @@ -26,7 +25,6 @@ ["system", "system_auth", "system_schema", "system_distributed", "system_traces"] ) -VERSION: str = "[version=2.0]" # these column names are present on the system_schema tables COL_NAMES = { @@ -112,12 +110,6 @@ def get_column_type(cassandra_column_type: str) -> SchemaFieldDataType: return SchemaFieldDataType(type=type_class()) - def __init__(self) -> None: - self._prefix_name_stack: List[str] = [VERSION] - - def _get_cur_field_path(self) -> str: - return ".".join(self._prefix_name_stack) - def _get_schema_fields( self, cassandra_column_infos: List ) -> Generator[SchemaField, None, None]: @@ -132,28 +124,18 @@ def _get_schema_fields( column_name: str = column_info[COL_NAMES["column_name"]] cassandra_type: str = column_info[COL_NAMES["column_type"]] - if cassandra_type is not None: - self._prefix_name_stack.append(f"[type={cassandra_type}].{column_name}") - schema_field_data_type: SchemaFieldDataType = self.get_column_type( - cassandra_type - ) - schema_field: SchemaField = SchemaField( - fieldPath=self._get_cur_field_path(), - nativeDataType=cassandra_type, - type=schema_field_data_type, - description=None, - nullable=True, - recursive=False, - ) - yield schema_field - self._prefix_name_stack.pop() - else: - # Unexpected! Log a warning. - logger.warning( - f"Cassandra schema does not have 'type'!" - f" Schema={json.dumps(column_info)}" - ) - continue + schema_field_data_type: SchemaFieldDataType = self.get_column_type( + cassandra_type + ) + schema_field: SchemaField = SchemaField( + fieldPath=column_name, + nativeDataType=cassandra_type, + type=schema_field_data_type, + description=None, + nullable=True, + recursive=False, + ) + yield schema_field @classmethod def get_schema_fields(