Skip to content

Commit

Permalink
#106: Linted
Browse files Browse the repository at this point in the history
Signed-off-by: Dominik Hoffmann <[email protected]>
  • Loading branch information
dh1542 committed Jan 21, 2025
1 parent 0419455 commit 61ebc77
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
)
from ..interfaces import DataManipulationBaseInterface


class GaussianSmoothing(DataManipulationBaseInterface):
"""
Applies Gaussian smoothing to specified columns of a PySpark DataFrame.
Expand Down Expand Up @@ -59,13 +60,13 @@ class GaussianSmoothing(DataManipulationBaseInterface):
"""

def __init__(
self,
df: PySparkDataFrame,
sigma: float,
mode: str = "temporal",
id_col: str = "id",
timestamp_col: str = "timestamp",
value_col: str = "value",
self,
df: PySparkDataFrame,
sigma: float,
mode: str = "temporal",
id_col: str = "id",
timestamp_col: str = "timestamp",
value_col: str = "value",
) -> None:

if not isinstance(df, PySparkDataFrame):
Expand Down Expand Up @@ -129,11 +130,10 @@ def filter(self) -> PySparkDataFrame:

smoothed_values = gaussian_filter1d(values_sorted, sigma=self.sigma)


reverse_indices = np.argsort(sorted_indices)
pdf.loc[mask, self.value_col] = smoothed_values[reverse_indices]

spark = self.df.sparkSession
result_df = spark.createDataFrame(pdf)

return result_df
return result_df
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from pyspark.sql import SparkSession

from src.sdk.python.rtdip_sdk.pipelines.data_quality.data_manipulation.spark.gaussian_smoothing import (
GaussianSmoothing
GaussianSmoothing,
)


Expand All @@ -40,7 +40,7 @@ def test_gaussian_smoothing_temporal(spark_session: SparkSession):
("A2PS64V0J.:ZUX09R", "2024-01-02 16:00:12.000", "Good", "0.150000006"),
("A2PS64V0J.:ZUX09R", "2024-01-02 20:03:46.000", "Good", "0.340000004"),
],
["TagName", "EventTime", "Status", "Value"]
["TagName", "EventTime", "Status", "Value"],
)

# Apply smoothing
Expand All @@ -50,13 +50,15 @@ def test_gaussian_smoothing_temporal(spark_session: SparkSession):
id_col="TagName",
mode="temporal",
timestamp_col="EventTime",
value_col="Value"
value_col="Value",
)
result_df = smoother.filter()
result_pdf = result_df.toPandas()
original_pdf = df.toPandas()

assert not result_pdf["Value"].equals(original_pdf["Value"]), "Values should be smoothed and not identical"
assert not result_pdf["Value"].equals(
original_pdf["Value"]
), "Values should be smoothed and not identical"


def test_gaussian_smoothing_spatial(spark_session: SparkSession):
Expand All @@ -68,7 +70,7 @@ def test_gaussian_smoothing_spatial(spark_session: SparkSession):
("A2PS64V0J.:ZUX09R", "2024-01-02 16:00:12.000", "Good", "0.150000006"),
("A2PS64V0J.:ZUX09R", "2024-01-02 20:03:46.000", "Good", "0.340000004"),
],
["TagName", "EventTime", "Status", "Value"]
["TagName", "EventTime", "Status", "Value"],
)

# Apply smoothing
Expand All @@ -78,13 +80,15 @@ def test_gaussian_smoothing_spatial(spark_session: SparkSession):
id_col="TagName",
mode="spatial",
timestamp_col="EventTime",
value_col="Value"
value_col="Value",
)
result_df = smoother.filter()
result_pdf = result_df.toPandas()
original_pdf = df.toPandas()

assert not result_pdf["Value"].equals(original_pdf["Value"]), "Values should be smoothed and not identical"
assert not result_pdf["Value"].equals(
original_pdf["Value"]
), "Values should be smoothed and not identical"


def test_interval_detection_large_data_set(spark_session: SparkSession):
Expand All @@ -94,18 +98,18 @@ def test_interval_detection_large_data_set(spark_session: SparkSession):

df = spark_session.read.option("header", "true").csv(file_path)


smoother = GaussianSmoothing(
df=df,
sigma=1,
id_col="TagName",
mode="temporal",
timestamp_col="EventTime",
value_col="Value"
value_col="Value",
)

actual_df = smoother.filter()


def test_gaussian_smoothing_invalid_mode(spark_session: SparkSession):
# Create test data
df = spark_session.createDataFrame(
Expand All @@ -116,7 +120,7 @@ def test_gaussian_smoothing_invalid_mode(spark_session: SparkSession):
("A2PS64V0J.:ZUX09R", "2024-01-02 16:00:12.000", "Good", "0.150000006"),
("A2PS64V0J.:ZUX09R", "2024-01-02 20:03:46.000", "Good", "0.340000004"),
],
["TagName", "EventTime", "Status", "Value"]
["TagName", "EventTime", "Status", "Value"],
)

# Attempt to initialize with an invalid mode
Expand All @@ -127,8 +131,5 @@ def test_gaussian_smoothing_invalid_mode(spark_session: SparkSession):
id_col="TagName",
mode="invalid_mode", # Invalid mode
timestamp_col="EventTime",
value_col="Value"
value_col="Value",
)



0 comments on commit 61ebc77

Please sign in to comment.