diff --git a/src/sdk/python/rtdip_sdk/pipelines/data_quality/data_manipulation/spark/gaussian_smoothing.py b/src/sdk/python/rtdip_sdk/pipelines/data_quality/data_manipulation/spark/gaussian_smoothing.py index 156acf387..fa95350c4 100644 --- a/src/sdk/python/rtdip_sdk/pipelines/data_quality/data_manipulation/spark/gaussian_smoothing.py +++ b/src/sdk/python/rtdip_sdk/pipelines/data_quality/data_manipulation/spark/gaussian_smoothing.py @@ -10,6 +10,7 @@ ) from ..interfaces import DataManipulationBaseInterface + class GaussianSmoothing(DataManipulationBaseInterface): """ Applies Gaussian smoothing to specified columns of a PySpark DataFrame. @@ -59,13 +60,13 @@ class GaussianSmoothing(DataManipulationBaseInterface): """ def __init__( - self, - df: PySparkDataFrame, - sigma: float, - mode: str = "temporal", - id_col: str = "id", - timestamp_col: str = "timestamp", - value_col: str = "value", + self, + df: PySparkDataFrame, + sigma: float, + mode: str = "temporal", + id_col: str = "id", + timestamp_col: str = "timestamp", + value_col: str = "value", ) -> None: if not isinstance(df, PySparkDataFrame): @@ -129,11 +130,10 @@ def filter(self) -> PySparkDataFrame: smoothed_values = gaussian_filter1d(values_sorted, sigma=self.sigma) - reverse_indices = np.argsort(sorted_indices) pdf.loc[mask, self.value_col] = smoothed_values[reverse_indices] spark = self.df.sparkSession result_df = spark.createDataFrame(pdf) - return result_df \ No newline at end of file + return result_df diff --git a/tests/sdk/python/rtdip_sdk/pipelines/data_quality/data_manipulation/spark/test_gaussian_smoothing.py b/tests/sdk/python/rtdip_sdk/pipelines/data_quality/data_manipulation/spark/test_gaussian_smoothing.py index 607f91762..ff0f8d50a 100644 --- a/tests/sdk/python/rtdip_sdk/pipelines/data_quality/data_manipulation/spark/test_gaussian_smoothing.py +++ b/tests/sdk/python/rtdip_sdk/pipelines/data_quality/data_manipulation/spark/test_gaussian_smoothing.py @@ -16,7 +16,7 @@ from pyspark.sql import SparkSession from src.sdk.python.rtdip_sdk.pipelines.data_quality.data_manipulation.spark.gaussian_smoothing import ( - GaussianSmoothing + GaussianSmoothing, ) @@ -40,7 +40,7 @@ def test_gaussian_smoothing_temporal(spark_session: SparkSession): ("A2PS64V0J.:ZUX09R", "2024-01-02 16:00:12.000", "Good", "0.150000006"), ("A2PS64V0J.:ZUX09R", "2024-01-02 20:03:46.000", "Good", "0.340000004"), ], - ["TagName", "EventTime", "Status", "Value"] + ["TagName", "EventTime", "Status", "Value"], ) # Apply smoothing @@ -50,13 +50,15 @@ def test_gaussian_smoothing_temporal(spark_session: SparkSession): id_col="TagName", mode="temporal", timestamp_col="EventTime", - value_col="Value" + value_col="Value", ) result_df = smoother.filter() result_pdf = result_df.toPandas() original_pdf = df.toPandas() - assert not result_pdf["Value"].equals(original_pdf["Value"]), "Values should be smoothed and not identical" + assert not result_pdf["Value"].equals( + original_pdf["Value"] + ), "Values should be smoothed and not identical" def test_gaussian_smoothing_spatial(spark_session: SparkSession): @@ -68,7 +70,7 @@ def test_gaussian_smoothing_spatial(spark_session: SparkSession): ("A2PS64V0J.:ZUX09R", "2024-01-02 16:00:12.000", "Good", "0.150000006"), ("A2PS64V0J.:ZUX09R", "2024-01-02 20:03:46.000", "Good", "0.340000004"), ], - ["TagName", "EventTime", "Status", "Value"] + ["TagName", "EventTime", "Status", "Value"], ) # Apply smoothing @@ -78,13 +80,15 @@ def test_gaussian_smoothing_spatial(spark_session: SparkSession): id_col="TagName", mode="spatial", timestamp_col="EventTime", - value_col="Value" + value_col="Value", ) result_df = smoother.filter() result_pdf = result_df.toPandas() original_pdf = df.toPandas() - assert not result_pdf["Value"].equals(original_pdf["Value"]), "Values should be smoothed and not identical" + assert not result_pdf["Value"].equals( + original_pdf["Value"] + ), "Values should be smoothed and not identical" def test_interval_detection_large_data_set(spark_session: SparkSession): @@ -94,18 +98,18 @@ def test_interval_detection_large_data_set(spark_session: SparkSession): df = spark_session.read.option("header", "true").csv(file_path) - smoother = GaussianSmoothing( df=df, sigma=1, id_col="TagName", mode="temporal", timestamp_col="EventTime", - value_col="Value" + value_col="Value", ) actual_df = smoother.filter() + def test_gaussian_smoothing_invalid_mode(spark_session: SparkSession): # Create test data df = spark_session.createDataFrame( @@ -116,7 +120,7 @@ def test_gaussian_smoothing_invalid_mode(spark_session: SparkSession): ("A2PS64V0J.:ZUX09R", "2024-01-02 16:00:12.000", "Good", "0.150000006"), ("A2PS64V0J.:ZUX09R", "2024-01-02 20:03:46.000", "Good", "0.340000004"), ], - ["TagName", "EventTime", "Status", "Value"] + ["TagName", "EventTime", "Status", "Value"], ) # Attempt to initialize with an invalid mode @@ -127,8 +131,5 @@ def test_gaussian_smoothing_invalid_mode(spark_session: SparkSession): id_col="TagName", mode="invalid_mode", # Invalid mode timestamp_col="EventTime", - value_col="Value" + value_col="Value", ) - - -