Skip to content

Commit

Permalink
fix: change field path from v2 to v1
Browse files Browse the repository at this point in the history
  • Loading branch information
sagar-salvi-apptware committed Nov 13, 2024
1 parent a5f9f0f commit b611c8b
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from dataclasses import dataclass, field
from typing import Any, Dict, Iterable, List, Optional

from datahub.api.entities.dataset.dataset import Dataset
from datahub.emitter.mce_builder import (
make_data_platform_urn,
make_dataplatform_instance_urn,
Expand Down Expand Up @@ -34,7 +33,6 @@
from datahub.ingestion.source.cassandra.cassandra_utils import (
COL_NAMES,
SYSTEM_KEYSPACE_LIST,
VERSION,
CassandraToSchemaFieldConverter,
)
from datahub.ingestion.source.common.subtypes import (
Expand Down Expand Up @@ -497,16 +495,13 @@ def get_upstream_fields_of_field_in_datasource(
fine_grained_lineages = []
for column_info in column_infos:
source_column = column_info.column_name
column_type = column_info.type
if source_column:
field_path = f"{VERSION}.[type={column_type}].{source_column}"
field_path_v1 = Dataset._simplify_field_path(field_path)
fine_grained_lineages.append(
FineGrainedLineageClass(
upstreamType=FineGrainedLineageUpstreamTypeClass.FIELD_SET,
downstreamType=FineGrainedLineageDownstreamTypeClass.FIELD,
downstreams=[make_schema_field_urn(dataset_urn, field_path_v1)],
upstreams=[make_schema_field_urn(upstream_urn, field_path_v1)],
downstreams=[make_schema_field_urn(dataset_urn, source_column)],
upstreams=[make_schema_field_urn(upstream_urn, source_column)],
)
)
return fine_grained_lineages
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import json
import logging
from typing import Dict, Generator, List, Optional, Type

Expand Down Expand Up @@ -26,7 +25,6 @@
["system", "system_auth", "system_schema", "system_distributed", "system_traces"]
)

VERSION: str = "[version=2.0]"

# these column names are present on the system_schema tables
COL_NAMES = {
Expand Down Expand Up @@ -112,12 +110,6 @@ def get_column_type(cassandra_column_type: str) -> SchemaFieldDataType:

return SchemaFieldDataType(type=type_class())

def __init__(self) -> None:
self._prefix_name_stack: List[str] = [VERSION]

def _get_cur_field_path(self) -> str:
return ".".join(self._prefix_name_stack)

def _get_schema_fields(
self, cassandra_column_infos: List
) -> Generator[SchemaField, None, None]:
Expand All @@ -132,28 +124,18 @@ def _get_schema_fields(
column_name: str = column_info[COL_NAMES["column_name"]]
cassandra_type: str = column_info[COL_NAMES["column_type"]]

if cassandra_type is not None:
self._prefix_name_stack.append(f"[type={cassandra_type}].{column_name}")
schema_field_data_type: SchemaFieldDataType = self.get_column_type(
cassandra_type
)
schema_field: SchemaField = SchemaField(
fieldPath=self._get_cur_field_path(),
nativeDataType=cassandra_type,
type=schema_field_data_type,
description=None,
nullable=True,
recursive=False,
)
yield schema_field
self._prefix_name_stack.pop()
else:
# Unexpected! Log a warning.
logger.warning(
f"Cassandra schema does not have 'type'!"
f" Schema={json.dumps(column_info)}"
)
continue
schema_field_data_type: SchemaFieldDataType = self.get_column_type(
cassandra_type
)
schema_field: SchemaField = SchemaField(
fieldPath=column_name,
nativeDataType=cassandra_type,
type=schema_field_data_type,
description=None,
nullable=True,
recursive=False,
)
yield schema_field

@classmethod
def get_schema_fields(
Expand Down

0 comments on commit b611c8b

Please sign in to comment.