Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
Tran committed Oct 29, 2024
2 parents d782f13 + 451dbd9 commit 2783c91
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 2 deletions.
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -136,4 +136,7 @@ spark-warehouse/
spark-checkpoints/

# Delta Sharing
config.share
config.share

# JetBrains
.idea/
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
"azure-keyvault-secrets>=4.7.0,<5.0.0",
"web3>=6.18.0,<7.0.0",
"polars[deltalake]>=0.18.8,<1.0.0",
"delta-sharing>=1.0.0,<1.1.0",
"delta-sharing>=1.0.0,<2.0.0",
"xarray>=2023.1.0,<2023.8.0",
"ecmwf-api-client>=1.6.3,<2.0.0",
"netCDF4>=1.6.4,<2.0.0",
Expand Down
1 change: 1 addition & 0 deletions src/sdk/python/rtdip_sdk/pipelines/monitoring/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from .spark.data_quality.great_expectations_data_quality import *
from .spark.data_quality.duplicate_detection import *
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# Copyright 2022 RTDIP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from pyspark.sql.functions import desc
from pyspark.sql import DataFrame as PySparkDataFrame

from ...interfaces import MonitoringBaseInterface
from ...._pipeline_utils.models import Libraries, SystemType


class DuplicateDetection(MonitoringBaseInterface):
"""
Cleanses a PySpark DataFrame from duplicates.
Example
--------
```python
from rtdip_sdk.pipelines.monitoring.spark.data_quality.duplicate_detection import DuplicateDetection
from pyspark.sql import SparkSession
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.functions import desc
duplicate_detection_monitor = DuplicateDetection(df)
result = duplicate_detection_monitor.filter()
```
Parameters:
df (DataFrame): PySpark DataFrame to be converted
"""

df: PySparkDataFrame

def __init__(self, df: PySparkDataFrame) -> None:
self.df = df

@staticmethod
def system_type():
"""
Attributes:
SystemType (Environment): Requires PYSPARK
"""
return SystemType.PYSPARK

@staticmethod
def libraries():
libraries = Libraries()
return libraries

@staticmethod
def settings() -> dict:
return {}

def filter(self) -> PySparkDataFrame:
"""
Returns:
DataFrame: A cleansed PySpark DataFrame from all the duplicates.
"""


cleansed_df = self.df.dropDuplicates(['TagName', 'EventTime']).orderBy(desc("EventTime"))
return cleansed_df
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# Copyright 2022 RTDIP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pytest

from pyspark.sql import SparkSession
from pyspark.sql.dataframe import DataFrame

from src.sdk.python.rtdip_sdk.pipelines.monitoring.spark.data_quality.duplicate_detection import DuplicateDetection


@pytest.fixture(scope="session")
def spark_session():
return SparkSession.builder.master("local[2]").appName("test").getOrCreate()


def test_duplicate_detection(spark_session: SparkSession):
expected_df = spark_session.createDataFrame(
[
("A2PS64V0J.:ZUX09R", "2024-01-02 20:03:46.000", "Good", "0.340000004"),
("A2PS64V0J.:ZUX09R", "2024-01-02 16:00:12.000", "Good", "0.150000006"),
("A2PS64V0J.:ZUX09R", "2024-01-02 11:56:42.000", "Good", "0.129999995"),
("A2PS64V0J.:ZUX09R", "2024-01-02 07:53:11.000", "Good", "0.119999997"),
("A2PS64V0J.:ZUX09R", "2024-01-02 03:49:45.000", "Good", "0.129999995")
],
["TagName", "EventTime", "Status", "Value"]
)

df = spark_session.createDataFrame(
[
("A2PS64V0J.:ZUX09R", "2024-01-02 20:03:46.000", "Good", "0.340000004"),
("A2PS64V0J.:ZUX09R", "2024-01-02 20:03:46.000", "Good", "0.340000004"),
("A2PS64V0J.:ZUX09R", "2024-01-02 16:00:12.000", "Good", "0.150000006"),
("A2PS64V0J.:ZUX09R", "2024-01-02 11:56:42.000", "Good", "0.129999995"),
("A2PS64V0J.:ZUX09R", "2024-01-02 07:53:11.000", "Good", "0.119999997"),
("A2PS64V0J.:ZUX09R", "2024-01-02 07:53:11.000", "Good", "0.119999997"),
("A2PS64V0J.:ZUX09R", "2024-01-02 07:53:11.000", "Good", "0.119999997"),
("A2PS64V0J.:ZUX09R", "2024-01-02 03:49:45.000", "Good", "0.129999995")
],
["TagName", "EventTime", "Status", "Value"]
)

duplicate_detection_monitor = DuplicateDetection(df)
actual_df = duplicate_detection_monitor.filter()

assert isinstance(actual_df, DataFrame)

assert expected_df.columns == actual_df.columns
assert expected_df.schema == actual_df.schema
assert expected_df.collect() == actual_df.collect()

0 comments on commit 2783c91

Please sign in to comment.