diff --git a/Deliverables/documentation/Flatline Filter - Real Time Data Ingestion Platform.htm b/Deliverables/documentation/Flatline Filter - Real Time Data Ingestion Platform.htm new file mode 100644 index 000000000..599a74409 --- /dev/null +++ b/Deliverables/documentation/Flatline Filter - Real Time Data Ingestion Platform.htm @@ -0,0 +1,7619 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + Flatline Filter - Real Time Data Ingestion Platform + + + + + + + + + + + + + + + +
+ + + + Skip to content + + +
+
+ +
+ + + + +
+ + +
+ +
+ + + + + + + + + +
+
+ + + +
+
+
+ + + + + + + +
+
+
+ + + +
+
+
+ + + +
+
+
+ + + +
+
+ + + + +

Flatline Filter

+ +
+ + + + +
+ + + +
+ + + + + + + + +
+ + + +

+ FlatlineFilter + + +

+ + +
+

+ Bases: DataManipulationBaseInterface

+ + +

Removes and logs rows with flatlining detected in specified columns of a PySpark DataFrame.

+ + +

Parameters:

+
+ + + + + + + + + + + + + + + + + + + + + + + + + + + + +
NameTypeDescriptionDefault
df + DataFrame + +
+

The input DataFrame to process.

+
+
+ required +
watch_columns + list + +
+

List of column names to monitor for flatlining (null or zero values).

+
+
+ required +
tolerance_timespan + int + +
+

Maximum allowed consecutive flatlining period. Rows exceeding this period are removed.

+
+
+ required +
+ + +
+ Example +
from pyspark.sql import SparkSession
+from rtdip_sdk.pipelines.data_manipulation.spark.data_quality.flatline_filter import FlatlineFilter
+
+spark = SparkSession.builder.master("local[1]").appName("FlatlineFilterExample").getOrCreate()
+
+# Example DataFrame
+data = [
+    (1, "2024-01-02 03:49:45.000", 0.0),
+    (1, "2024-01-02 03:50:45.000", 0.0),
+    (1, "2024-01-02 03:51:45.000", 0.0),
+    (2, "2024-01-02 03:49:45.000", 5.0),
+]
+columns = ["TagName", "EventTime", "Value"]
+df = spark.createDataFrame(data, columns)
+
+filter_flatlining_rows = FlatlineFilter(
+    df=df,
+    watch_columns=["Value"],
+    tolerance_timespan=2,
+)
+
+result_df = filter_flatlining_rows.filter()
+result_df.show()
+
+
+
+ Source code in src/sdk/python/rtdip_sdk/pipelines/data_quality/data_manipulation/spark/flatline_filter.py +
15
+16
+17
+18
+19
+20
+21
+22
+23
+24
+25
+26
+27
+28
+29
+30
+31
+32
+33
+34
+35
+36
+37
+38
+39
+40
+41
+42
+43
+44
+45
+46
+47
+48
+49
+50
+51
+52
+53
+54
+55
+56
+57
+58
+59
+60
+61
+62
+63
+64
+65
+66
+67
+68
+69
+70
+71
+72
+73
+74
+75
+76
+77
+78
+79
+80
+81
+82
class FlatlineFilter(DataManipulationBaseInterface):
+    """
+    Removes and logs rows with flatlining detected in specified columns of a PySpark DataFrame.
+
+    Args:
+        df (pyspark.sql.DataFrame): The input DataFrame to process.
+        watch_columns (list): List of column names to monitor for flatlining (null or zero values).
+        tolerance_timespan (int): Maximum allowed consecutive flatlining period. Rows exceeding this period are removed.
+
+    Example:
+        ```python
+        from pyspark.sql import SparkSession
+        from rtdip_sdk.pipelines.data_manipulation.spark.data_quality.flatline_filter import FlatlineFilter
+
+        spark = SparkSession.builder.master("local[1]").appName("FlatlineFilterExample").getOrCreate()
+
+        # Example DataFrame
+        data = [
+            (1, "2024-01-02 03:49:45.000", 0.0),
+            (1, "2024-01-02 03:50:45.000", 0.0),
+            (1, "2024-01-02 03:51:45.000", 0.0),
+            (2, "2024-01-02 03:49:45.000", 5.0),
+        ]
+        columns = ["TagName", "EventTime", "Value"]
+        df = spark.createDataFrame(data, columns)
+
+        filter_flatlining_rows = FlatlineFilter(
+            df=df,
+            watch_columns=["Value"],
+            tolerance_timespan=2,
+        )
+
+        result_df = filter_flatlining_rows.filter()
+        result_df.show()
+        ```
+    """
+
+    def __init__(
+        self, df: PySparkDataFrame, watch_columns: list, tolerance_timespan: int
+    ) -> None:
+        self.df = df
+        self.flatline_detection = FlatlineDetection(
+            df=df, watch_columns=watch_columns, tolerance_timespan=tolerance_timespan
+        )
+
+    @staticmethod
+    def system_type():
+        return SystemType.PYSPARK
+
+    @staticmethod
+    def libraries():
+        libraries = Libraries()
+        return libraries
+
+    @staticmethod
+    def settings() -> dict:
+        return {}
+
+    def filter(self) -> PySparkDataFrame:
+        """
+        Removes rows with flatlining detected.
+
+        Returns:
+            pyspark.sql.DataFrame: A DataFrame without rows with flatlining detected.
+        """
+        flatlined_rows = self.flatline_detection.check_for_flatlining()
+        flatlined_rows = flatlined_rows.select(*self.df.columns)
+        return self.df.subtract(flatlined_rows)
+
+
+ + + +
+ + + + + + + + + +
+ + +

+ filter() + +

+ + +
+ +

Removes rows with flatlining detected.

+ + +

Returns:

+
+ + + + + + + + + + + + +
TypeDescription
+ DataFrame + +
+

pyspark.sql.DataFrame: A DataFrame without rows with flatlining detected.

+
+
+ +
+ Source code in src/sdk/python/rtdip_sdk/pipelines/data_quality/data_manipulation/spark/flatline_filter.py +
73
+74
+75
+76
+77
+78
+79
+80
+81
+82
def filter(self) -> PySparkDataFrame:
+    """
+    Removes rows with flatlining detected.
+
+    Returns:
+        pyspark.sql.DataFrame: A DataFrame without rows with flatlining detected.
+    """
+    flatlined_rows = self.flatline_detection.check_for_flatlining()
+    flatlined_rows = flatlined_rows.select(*self.df.columns)
+    return self.df.subtract(flatlined_rows)
+
+
+
+ +
+ + + +
+ +
+ +
+ + + + +
+ +
+ +
+ + + + + + + + + + + + + +
+
+ + + + + +
+ + + +
+ + + +
+
+
+
+ + + + + + + + + + + + \ No newline at end of file diff --git a/docs/sdk/code-reference/pipelines/data_quality/data_manipulation/spark/prediction/arima.md b/docs/sdk/code-reference/pipelines/data_quality/data_manipulation/spark/prediction/arima.md new file mode 100644 index 000000000..5f8008516 --- /dev/null +++ b/docs/sdk/code-reference/pipelines/data_quality/data_manipulation/spark/prediction/arima.md @@ -0,0 +1 @@ +::: src.sdk.python.rtdip_sdk.pipelines.data_quality.data_manipulation.spark.prediction.arima diff --git a/docs/sdk/code-reference/pipelines/data_quality/data_manipulation/spark/prediction/auto_arima.md b/docs/sdk/code-reference/pipelines/data_quality/data_manipulation/spark/prediction/auto_arima.md new file mode 100644 index 000000000..33bb559b9 --- /dev/null +++ b/docs/sdk/code-reference/pipelines/data_quality/data_manipulation/spark/prediction/auto_arima.md @@ -0,0 +1 @@ +::: src.sdk.python.rtdip_sdk.pipelines.data_quality.data_manipulation.spark.prediction.auto_arima diff --git a/docs/sdk/code-reference/pipelines/data_quality/monitoring/spark/moving_average.md b/docs/sdk/code-reference/pipelines/data_quality/monitoring/spark/moving_average.md new file mode 100644 index 000000000..0b13b472d --- /dev/null +++ b/docs/sdk/code-reference/pipelines/data_quality/monitoring/spark/moving_average.md @@ -0,0 +1 @@ +::: src.sdk.python.rtdip_sdk.pipelines.data_quality.monitoring.spark.moving_average \ No newline at end of file diff --git a/mkdocs.yml b/mkdocs.yml index 4ac23f5a6..ed7091b2b 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -236,18 +236,35 @@ nav: - Deploy: - Databricks: sdk/code-reference/pipelines/deploy/databricks.md - Data Quality: - - Monitoring: + - Monitoring: - Check Value Ranges: sdk/code-reference/pipelines/data_quality/monitoring/spark/check_value_ranges.md - - Great Expectations: - - Data Quality Monitoring: sdk/code-reference/pipelines/data_quality/monitoring/spark/great_expectations.md + - Great Expectations: + - Data Quality Monitoring: sdk/code-reference/pipelines/data_quality/monitoring/spark/great_expectations.md - Flatline Detection: sdk/code-reference/pipelines/data_quality/monitoring/spark/flatline_detection.md - Identify Missing Data: - Interval Based: sdk/code-reference/pipelines/data_quality/monitoring/spark/identify_missing_data_interval.md - Pattern Based: sdk/code-reference/pipelines/data_quality/monitoring/spark/identify_missing_data_pattern.md + - Moving Average: sdk/code-reference/pipelines/data_quality/monitoring/spark/moving_average.md - Data Manipulation: - Duplicate Detection: sdk/code-reference/pipelines/data_quality/data_manipulation/spark/duplicate_detection.md - - Filter Out of Range Values: sdk/code-reference/pipelines/data_quality/data_manipulation/spark/delete_out_of_range_values.md + - Filter Out of Range Values: sdk/code-reference/pipelines/data_quality/data_manipulation/spark/out_of_range_value_filter.md - Flatline Filter: sdk/code-reference/pipelines/data_quality/data_manipulation/spark/flatline_filter.md + - Dimensionality Reduction: sdk/code-reference/pipelines/data_quality/data_manipulation/spark/dimensionality_reduction.md + - Interval Filtering: sdk/code-reference/pipelines/data_quality/data_manipulation/spark/interval_filtering.md + - K-Sigma Anomaly Detection: sdk/code-reference/pipelines/data_quality/data_manipulation/spark/k_sigma_anomaly_detection.md + - Missing Value Imputation: sdk/code-reference/pipelines/data_quality/data_manipulation/spark/missing_value_imputation.md + - Normalization: + - Normalization: sdk/code-reference/pipelines/data_quality/data_manipulation/spark/normalization/normalization.md + - Normalization Mean: sdk/code-reference/pipelines/data_quality/data_manipulation/spark/normalization/normalization_mean.md + - Normalization MinMax: sdk/code-reference/pipelines/data_quality/data_manipulation/spark/normalization/normalization_minmax.md + - Normalization ZScore: sdk/code-reference/pipelines/data_quality/data_manipulation/spark/normalization/normalization_zscore.md + - Prediction: + - Arima: sdk/code-reference/pipelines/data_quality/data_manipulation/spark/prediction/arima.md + - Auto Arima: sdk/code-reference/pipelines/data_quality/data_manipulation/spark/prediction/auto_arima.md + - Machine Learning: + - Data Binning: sdk/code-reference/pipelines/machine_learning/spark/data_binning.md + - Linear Regression: sdk/code-reference/pipelines/machine_learning/spark/linear_regression.md + - Jobs: sdk/pipelines/jobs.md - Deploy: - Databricks Workflows: sdk/pipelines/deploy/databricks.md @@ -339,4 +356,4 @@ nav: - blog/index.md - University: - University: university/overview.md - \ No newline at end of file + diff --git a/src/sdk/python/rtdip_sdk/pipelines/data_quality/data_manipulation/spark/prediction/arima.py b/src/sdk/python/rtdip_sdk/pipelines/data_quality/data_manipulation/spark/prediction/arima.py index ea70120d1..ad4446f0d 100644 --- a/src/sdk/python/rtdip_sdk/pipelines/data_quality/data_manipulation/spark/prediction/arima.py +++ b/src/sdk/python/rtdip_sdk/pipelines/data_quality/data_manipulation/spark/prediction/arima.py @@ -99,7 +99,6 @@ class ArimaPrediction(DataManipulationBaseInterface, InputValidator): timestamp_name (str): Name of column, where event timestamps are stored source_name (str): Name of column in source-based format, where source of events are stored status_name (str): Name of column in source-based format, where status of events are stored - # Options for ARIMA external_regressor_names (List[str]): Currently not working. Names of the columns with data to use for prediction, but not extend number_of_data_points_to_predict (int): Amount of points to forecast number_of_data_points_to_analyze (int): Amount of most recent points to train on @@ -319,7 +318,7 @@ def filter(self) -> PySparkDataFrame: value imputation to prevent learning on dirty data. Returns: - DataFrame: A PySpark DataFrame with forcasted value entries depending on constructor parameters. + DataFrame: A PySpark DataFrame with forecasted value entries depending on constructor parameters. """ # expected_scheme = StructType( # [ diff --git a/src/sdk/python/rtdip_sdk/pipelines/data_quality/data_manipulation/spark/prediction/auto_arima.py b/src/sdk/python/rtdip_sdk/pipelines/data_quality/data_manipulation/spark/prediction/auto_arima.py index 2f5ef22e6..808bd3ced 100644 --- a/src/sdk/python/rtdip_sdk/pipelines/data_quality/data_manipulation/spark/prediction/auto_arima.py +++ b/src/sdk/python/rtdip_sdk/pipelines/data_quality/data_manipulation/spark/prediction/auto_arima.py @@ -77,8 +77,6 @@ class ArimaAutoPrediction(ArimaPrediction): number_of_data_points_to_predict (int): Amount of points to forecast number_of_data_points_to_analyze (int): Amount of most recent points to train on seasonal (bool): Setting for AutoArima, is past_data seasonal? - # Options for ARIMA - trend (str): ARIMA-Specific setting enforce_stationarity (bool): ARIMA-Specific setting enforce_invertibility (bool): ARIMA-Specific setting concentrate_scale (bool): ARIMA-Specific setting diff --git a/src/sdk/python/rtdip_sdk/pipelines/data_quality/monitoring/spark/moving_average.py b/src/sdk/python/rtdip_sdk/pipelines/data_quality/monitoring/spark/moving_average.py new file mode 100644 index 000000000..163086db8 --- /dev/null +++ b/src/sdk/python/rtdip_sdk/pipelines/data_quality/monitoring/spark/moving_average.py @@ -0,0 +1,135 @@ +import logging +from pyspark.sql import DataFrame as PySparkDataFrame +from pyspark.sql.functions import col, avg +from pyspark.sql.window import Window +from pyspark.sql.types import ( + StructType, + StructField, + StringType, + TimestampType, + FloatType, +) + +from src.sdk.python.rtdip_sdk.pipelines.data_quality.monitoring.interfaces import ( + MonitoringBaseInterface, +) +from src.sdk.python.rtdip_sdk.pipelines._pipeline_utils.models import ( + Libraries, + SystemType, +) +from ...input_validator import InputValidator + + +class MovingAverage(MonitoringBaseInterface, InputValidator): + """ + Computes and logs the moving average over a specified window size for a given PySpark DataFrame. + + Args: + df (pyspark.sql.DataFrame): The DataFrame to process. + window_size (int): The size of the moving window. + + Example: + ```python + from pyspark.sql import SparkSession + from rtdip_sdk.pipelines.monitoring.spark.data_quality.moving_average import MovingAverage + + spark = SparkSession.builder.master("local[1]").appName("MovingAverageExample").getOrCreate() + + data = [ + ("A2PS64V0J.:ZUX09R", "2024-01-02 03:49:45.000", "Good", 1.0), + ("A2PS64V0J.:ZUX09R", "2024-01-02 07:53:11.000", "Good", 2.0), + ("A2PS64V0J.:ZUX09R", "2024-01-02 11:56:42.000", "Good", 3.0), + ("A2PS64V0J.:ZUX09R", "2024-01-02 16:00:12.000", "Good", 4.0), + ("A2PS64V0J.:ZUX09R", "2024-01-02 20:03:46.000", "Good", 5.0), + ] + + columns = ["TagName", "EventTime", "Status", "Value"] + + df = spark.createDataFrame(data, columns) + + moving_avg = MovingAverage( + df=df, + window_size=3, + ) + + moving_avg.check() + ``` + """ + + df: PySparkDataFrame + window_size: int + EXPECTED_SCHEMA = StructType( + [ + StructField("TagName", StringType(), True), + StructField("EventTime", TimestampType(), True), + StructField("Status", StringType(), True), + StructField("Value", FloatType(), True), + ] + ) + + def __init__( + self, + df: PySparkDataFrame, + window_size: int, + ) -> None: + if not isinstance(window_size, int) or window_size <= 0: + raise ValueError("window_size must be a positive integer.") + + self.df = df + self.validate(self.EXPECTED_SCHEMA) + self.window_size = window_size + + self.logger = logging.getLogger(self.__class__.__name__) + if not self.logger.handlers: + handler = logging.StreamHandler() + formatter = logging.Formatter( + "%(asctime)s - %(name)s - %(levelname)s - %(message)s" + ) + handler.setFormatter(formatter) + self.logger.addHandler(handler) + self.logger.setLevel(logging.INFO) + + @staticmethod + def system_type(): + """ + Attributes: + SystemType (Environment): Requires PYSPARK + """ + return SystemType.PYSPARK + + @staticmethod + def libraries(): + libraries = Libraries() + return libraries + + @staticmethod + def settings() -> dict: + return {} + + def check(self) -> None: + """ + Computes and logs the moving average using a specified window size. + """ + + self._validate_inputs() + + window_spec = ( + Window.partitionBy("TagName") + .orderBy("EventTime") + .rowsBetween(-(self.window_size - 1), 0) + ) + + self.logger.info("Computing moving averages:") + + for row in ( + self.df.withColumn("MovingAverage", avg(col("Value")).over(window_spec)) + .select("TagName", "EventTime", "Value", "MovingAverage") + .collect() + ): + self.logger.info( + f"Tag: {row.TagName}, Time: {row.EventTime}, Value: {row.Value}, Moving Avg: {row.MovingAverage}" + ) + + def _validate_inputs(self): + if not isinstance(self.window_size, int) or self.window_size <= 0: + raise ValueError("window_size must be a positive integer.") diff --git a/tests/sdk/python/rtdip_sdk/pipelines/data_quality/monitoring/spark/test_moving_average.py b/tests/sdk/python/rtdip_sdk/pipelines/data_quality/monitoring/spark/test_moving_average.py new file mode 100644 index 000000000..f84d66ba0 --- /dev/null +++ b/tests/sdk/python/rtdip_sdk/pipelines/data_quality/monitoring/spark/test_moving_average.py @@ -0,0 +1,91 @@ +import pytest +import os +from pyspark.sql import SparkSession +from src.sdk.python.rtdip_sdk.pipelines.data_quality.monitoring.spark.moving_average import ( + MovingAverage, +) +import logging +from io import StringIO + + +@pytest.fixture(scope="session") +def spark(): + spark = ( + SparkSession.builder.master("local[2]") + .appName("MovingAverageTest") + .getOrCreate() + ) + yield spark + spark.stop() + + +@pytest.fixture +def log_capture(): + log_stream = StringIO() + logger = logging.getLogger("MovingAverage") + logger.setLevel(logging.INFO) + handler = logging.StreamHandler(log_stream) + formatter = logging.Formatter("%(message)s") + handler.setFormatter(formatter) + logger.addHandler(handler) + yield log_stream + logger.removeHandler(handler) + handler.close() + + +def test_moving_average_basic(spark, log_capture): + df = spark.createDataFrame( + [ + ("Tag1", "2024-01-02 03:49:45.000", "Good", 1.0), + ("Tag1", "2024-01-02 07:53:11.000", "Good", 2.0), + ("Tag1", "2024-01-02 11:56:42.000", "Good", 3.0), + ("Tag1", "2024-01-02 16:00:12.000", "Good", 4.0), + ("Tag1", "2024-01-02 20:03:46.000", "Good", 5.0), + ], + ["TagName", "EventTime", "Status", "Value"], + ) + + detector = MovingAverage(df, window_size=3) + detector.check() + + expected_logs = [ + "Computing moving averages:", + "Tag: Tag1, Time: 2024-01-02 03:49:45, Value: 1.0, Moving Avg: 1.0", + "Tag: Tag1, Time: 2024-01-02 07:53:11, Value: 2.0, Moving Avg: 1.5", + "Tag: Tag1, Time: 2024-01-02 11:56:42, Value: 3.0, Moving Avg: 2.0", + "Tag: Tag1, Time: 2024-01-02 16:00:12, Value: 4.0, Moving Avg: 3.0", + "Tag: Tag1, Time: 2024-01-02 20:03:46, Value: 5.0, Moving Avg: 4.0", + ] + + actual_logs = log_capture.getvalue().strip().split("\n") + + assert len(expected_logs) == len( + actual_logs + ), f"Expected {len(expected_logs)} logs, got {len(actual_logs)}" + + for expected, actual in zip(expected_logs, actual_logs): + assert expected in actual, f"Expected: '{expected}', got: '{actual}'" + + +def test_moving_average_invalid_window_size(spark): + df = spark.createDataFrame( + [ + ("Tag1", "2024-01-02 03:49:45.000", "Good", 1.0), + ("Tag1", "2024-01-02 07:53:11.000", "Good", 2.0), + ], + ["TagName", "EventTime", "Status", "Value"], + ) + + with pytest.raises(ValueError, match="window_size must be a positive integer."): + MovingAverage(df, window_size=-2) + + +def test_large_dataset(spark): + base_path = os.path.dirname(__file__) + file_path = os.path.join(base_path, "../../test_data.csv") + df = spark.read.option("header", "true").csv(file_path) + + assert df.count() > 0, "DataFrame was nicht geladen." + + detector = MovingAverage(df, window_size=5) + detector.check()