From f74d7e3633c7c83af2f80f91974805b9534d7a46 Mon Sep 17 00:00:00 2001 From: mohdsiddique Date: Thu, 3 Aug 2023 08:30:50 +0530 Subject: [PATCH] feat(ingestion/snowflake): use user email-id in urn generation for top users stat (#8513) Co-authored-by: MohdSiddiqueBagwan --- docs/how/updating-datahub.md | 2 +- .../source/snowflake/snowflake_config.py | 5 +++++ .../source/snowflake/snowflake_usage_v2.py | 22 +++++++++++++++---- .../source/snowflake/snowflake_utils.py | 11 ++++++++-- .../snowflake/snowflake_golden.json | 20 ++++++++--------- .../integration/snowflake/test_snowflake.py | 1 + 6 files changed, 44 insertions(+), 17 deletions(-) diff --git a/docs/how/updating-datahub.md b/docs/how/updating-datahub.md index 6e330e52128e0..01ca6badd5b45 100644 --- a/docs/how/updating-datahub.md +++ b/docs/how/updating-datahub.md @@ -26,7 +26,7 @@ certain column-level metrics. Instead, set `profile_table_level_only` to `false` individually enable / disable desired field metrics. - #8451: The `bigquery-beta` and `snowflake-beta` source aliases have been dropped. Use `bigquery` and `snowflake` as the source type instead. - #8472: Ingestion runs created with Pipeline.create will show up in the DataHub ingestion tab as CLI-based runs. To revert to the previous behavior of not showing these runs in DataHub, pass `no_default_report=True`. - +- #8513: `snowflake` connector will use user's `email` attribute as is in urn. To revert to previous behavior disable `email_as_user_identifier` in recipe. ### Potential Downtime - BrowsePathsV2 upgrade will now be handled by the `system-update` job in non-blocking mode. This process generates data needed for the new search diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_config.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_config.py index 79bf538af91d2..e8e80e172a9ce 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_config.py @@ -120,6 +120,11 @@ class SnowflakeV2Config( "upstreams_deny_pattern", "temporary_tables_pattern" ) + email_as_user_identifier: bool = Field( + default=True, + description="Format user urns as an email, if the snowflake user's email is set. If `email_domain` is provided, generates email addresses for snowflake users with unset emails, based on their username.", + ) + @validator("include_column_lineage") def validate_include_column_lineage(cls, v, values): if not values.get("include_table_lineage") and v: diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_usage_v2.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_usage_v2.py index 7cdc2283a08d6..3605205b6055c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_usage_v2.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_usage_v2.py @@ -218,7 +218,9 @@ def build_usage_statistics_for_dataset(self, dataset_identifier, row): ) if self.config.include_top_n_queries else None, - userCounts=self._map_user_counts(json.loads(row["USER_COUNTS"])), + userCounts=self._map_user_counts( + json.loads(row["USER_COUNTS"]), + ), fieldCounts=self._map_field_counts(json.loads(row["FIELD_COUNTS"])), ) @@ -247,7 +249,10 @@ def _map_top_sql_queries(self, top_sql_queries: Dict) -> List[str]: ] ) - def _map_user_counts(self, user_counts: Dict) -> List[DatasetUserUsageCounts]: + def _map_user_counts( + self, + user_counts: Dict, + ) -> List[DatasetUserUsageCounts]: filtered_user_counts = [] for user_count in user_counts: user_email = user_count.get("email") @@ -261,7 +266,11 @@ def _map_user_counts(self, user_counts: Dict) -> List[DatasetUserUsageCounts]: filtered_user_counts.append( DatasetUserUsageCounts( user=make_user_urn( - self.get_user_identifier(user_count["user_name"], user_email) + self.get_user_identifier( + user_count["user_name"], + user_email, + self.config.email_as_user_identifier, + ) ), count=user_count["total"], # NOTE: Generated emails may be incorrect, as email may be different than @@ -347,6 +356,7 @@ def _check_usage_date_ranges(self) -> Any: def _get_operation_aspect_work_unit( self, event: SnowflakeJoinedAccessEvent, discovered_datasets: List[str] ) -> Iterable[MetadataWorkUnit]: + if event.query_start_time and event.query_type: start_time = event.query_start_time query_type = event.query_type @@ -357,7 +367,11 @@ def _get_operation_aspect_work_unit( ) reported_time: int = int(time.time() * 1000) last_updated_timestamp: int = int(start_time.timestamp() * 1000) - user_urn = make_user_urn(self.get_user_identifier(user_name, user_email)) + user_urn = make_user_urn( + self.get_user_identifier( + user_name, user_email, self.config.email_as_user_identifier + ) + ) # NOTE: In earlier `snowflake-usage` connector this was base_objects_accessed, which is incorrect for obj in event.objects_modified: diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_utils.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_utils.py index c003c348e79c5..5a451bf197d34 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_utils.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_utils.py @@ -199,10 +199,17 @@ def get_dataset_identifier_from_qualified_name( # Users without email were skipped from both user entries as well as aggregates. # However email is not mandatory field in snowflake user, user_name is always present. def get_user_identifier( - self: SnowflakeCommonProtocol, user_name: str, user_email: Optional[str] + self: SnowflakeCommonProtocol, + user_name: str, + user_email: Optional[str], + email_as_user_identifier: bool, ) -> str: if user_email: - return self.snowflake_identifier(user_email.split("@")[0]) + return self.snowflake_identifier( + user_email + if email_as_user_identifier is True + else user_email.split("@")[0] + ) return self.snowflake_identifier(user_name) # TODO: Revisit this after stateful ingestion can commit checkpoint diff --git a/metadata-ingestion/tests/integration/snowflake/snowflake_golden.json b/metadata-ingestion/tests/integration/snowflake/snowflake_golden.json index 59f096af2419c..dd56eff79c6f6 100644 --- a/metadata-ingestion/tests/integration/snowflake/snowflake_golden.json +++ b/metadata-ingestion/tests/integration/snowflake/snowflake_golden.json @@ -5779,7 +5779,7 @@ "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" }, - "actor": "urn:li:corpuser:abc", + "actor": "urn:li:corpuser:abc@xyz.com", "operationType": "CREATE", "lastUpdatedTimestamp": 1654144861367 } @@ -5801,7 +5801,7 @@ "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" }, - "actor": "urn:li:corpuser:abc", + "actor": "urn:li:corpuser:abc@xyz.com", "operationType": "CREATE", "lastUpdatedTimestamp": 1654144861367 } @@ -5823,7 +5823,7 @@ "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" }, - "actor": "urn:li:corpuser:abc", + "actor": "urn:li:corpuser:abc@xyz.com", "operationType": "CREATE", "lastUpdatedTimestamp": 1654144861367 } @@ -5845,7 +5845,7 @@ "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" }, - "actor": "urn:li:corpuser:abc", + "actor": "urn:li:corpuser:abc@xyz.com", "operationType": "CREATE", "lastUpdatedTimestamp": 1654144861367 } @@ -5867,7 +5867,7 @@ "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" }, - "actor": "urn:li:corpuser:abc", + "actor": "urn:li:corpuser:abc@xyz.com", "operationType": "CREATE", "lastUpdatedTimestamp": 1654144861367 } @@ -5889,7 +5889,7 @@ "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" }, - "actor": "urn:li:corpuser:abc", + "actor": "urn:li:corpuser:abc@xyz.com", "operationType": "CREATE", "lastUpdatedTimestamp": 1654144861367 } @@ -5911,7 +5911,7 @@ "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" }, - "actor": "urn:li:corpuser:abc", + "actor": "urn:li:corpuser:abc@xyz.com", "operationType": "CREATE", "lastUpdatedTimestamp": 1654144861367 } @@ -5933,7 +5933,7 @@ "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" }, - "actor": "urn:li:corpuser:abc", + "actor": "urn:li:corpuser:abc@xyz.com", "operationType": "CREATE", "lastUpdatedTimestamp": 1654144861367 } @@ -5955,7 +5955,7 @@ "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" }, - "actor": "urn:li:corpuser:abc", + "actor": "urn:li:corpuser:abc@xyz.com", "operationType": "CREATE", "lastUpdatedTimestamp": 1654144861367 } @@ -5977,7 +5977,7 @@ "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" }, - "actor": "urn:li:corpuser:abc", + "actor": "urn:li:corpuser:abc@xyz.com", "operationType": "CREATE", "lastUpdatedTimestamp": 1654144861367 } diff --git a/metadata-ingestion/tests/integration/snowflake/test_snowflake.py b/metadata-ingestion/tests/integration/snowflake/test_snowflake.py index 2703daf54854b..53b2bcb236cd9 100644 --- a/metadata-ingestion/tests/integration/snowflake/test_snowflake.py +++ b/metadata-ingestion/tests/integration/snowflake/test_snowflake.py @@ -124,6 +124,7 @@ def test_snowflake_basic(pytestconfig, tmp_path, mock_time, mock_datahub_graph): use_legacy_lineage_method=False, validate_upstreams_against_patterns=False, include_operational_stats=True, + email_as_user_identifier=True, start_time=datetime(2022, 6, 6, 7, 17, 0, 0).replace( tzinfo=timezone.utc ),