From b00ba913363955764dca51bd3bd42f9f1c3168be Mon Sep 17 00:00:00 2001 From: Dominik Hoffmann Date: Tue, 29 Oct 2024 13:23:29 +0100 Subject: [PATCH] Revert "Feature: pipeline spark k-sigma anomaly filtering" --- .../data_quality/k_sigma_anomaly_detection.py | 102 ------------------ .../test_k_sigma_anomaly_detection.py | 93 ---------------- 2 files changed, 195 deletions(-) delete mode 100644 src/sdk/python/rtdip_sdk/pipelines/monitoring/spark/data_quality/k_sigma_anomaly_detection.py delete mode 100644 tests/sdk/python/rtdip_sdk/pipelines/monitoring/spark/data_quality/test_k_sigma_anomaly_detection.py diff --git a/src/sdk/python/rtdip_sdk/pipelines/monitoring/spark/data_quality/k_sigma_anomaly_detection.py b/src/sdk/python/rtdip_sdk/pipelines/monitoring/spark/data_quality/k_sigma_anomaly_detection.py deleted file mode 100644 index 12cee8e38..000000000 --- a/src/sdk/python/rtdip_sdk/pipelines/monitoring/spark/data_quality/k_sigma_anomaly_detection.py +++ /dev/null @@ -1,102 +0,0 @@ -# Copyright 2022 RTDIP -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from pyspark.sql import DataFrame, SparkSession -from pyspark.sql.functions import mean, stddev, median, abs, col -from ...interfaces import MonitoringBaseInterface -from ...._pipeline_utils.models import Libraries, SystemType - - -class KSigmaAnomalyDetection(MonitoringBaseInterface): - """ - Anomaly detection with the k-sigma method. This method either computes the mean and standard deviation, or the median and the median absolute deviation (MAD) of the data. - The k-sigma method then filters out all data points that are k times the standard deviation away from the mean, or k times the MAD away from the median. - Assuming a normal distribution, this method keeps around 99.7% of the data points when k=3 and use_median=False. - """ - - def __init__( - self, - spark: SparkSession, - df: DataFrame, - column_names: list[str], - k_value: int = 3, - use_median: bool = False, - ) -> None: - if len(column_names) == 0: - raise Exception("You must provide at least one column name") - if len(column_names) > 1: - raise NotImplemented("Multiple columns are not supported yet") - self.column_names = column_names - - self.use_median = use_median - self.spark = spark - self.df = df - self.k_value = k_value - - @staticmethod - def system_type(): - """ - Attributes: - SystemType (Environment): Requires PYSPARK - """ - return SystemType.PYSPARK - - @staticmethod - def libraries(): - libraries = Libraries() - return libraries - - @staticmethod - def settings() -> dict: - return {} - - def filter_anomalies(self) -> DataFrame: - """ - Filter anomalies based on the k-sigma rule - """ - - column_name = self.column_names[0] - mean_value, deviation = 0, 0 - - if mean_value is None: - raise Exception("Couldn't calculate mean value") - - if self.use_median: - mean_value = self.df.select(median(column_name)).first() - if mean_value is None: - raise Exception("Couldn't calculate median value") - mean_value = mean_value[0] - - deviation = self.df.agg(median(abs(col(column_name) - mean_value))).first() - if deviation is None: - raise Exception("Couldn't calculate mean value") - deviation = deviation[0] - else: - stats = self.df.select( - mean(column_name), stddev(self.column_names[0]) - ).first() - if stats is None: - raise Exception("Couldn't calculate mean value and standard deviation") - - mean_value = stats[0] - deviation = stats[1] - - shift = self.k_value * deviation - lower_bound = mean_value - shift - upper_bound = mean_value + shift - - return self.df.filter( - (self.df[column_name] >= lower_bound) - & (self.df[column_name] <= upper_bound) - ) diff --git a/tests/sdk/python/rtdip_sdk/pipelines/monitoring/spark/data_quality/test_k_sigma_anomaly_detection.py b/tests/sdk/python/rtdip_sdk/pipelines/monitoring/spark/data_quality/test_k_sigma_anomaly_detection.py deleted file mode 100644 index 3e56255a2..000000000 --- a/tests/sdk/python/rtdip_sdk/pipelines/monitoring/spark/data_quality/test_k_sigma_anomaly_detection.py +++ /dev/null @@ -1,93 +0,0 @@ -from pyspark.sql import SparkSession - -from src.sdk.python.rtdip_sdk.pipelines.monitoring.spark.data_quality.k_sigma_anomaly_detection import ( - KSigmaAnomalyDetection, -) - -# Normal data mean=10 stddev=5 + 3 anomalies -# fmt: off -normal_input_values = [ 5.19811497, 8.34437927, 3.62104032, 10.02819525, 6.1183447 , - 20.10067378, 10.32313075, 14.090119 , 21.43078927, 2.76624332, - 10.84089416, 1.90722629, 11.19750641, 13.70925639, 5.61011921, - 4.50072694, 13.79440311, 13.30173747, 7.07183589, 12.79853139, 100] - -normal_expected_values = [ 5.19811497, 8.34437927, 3.62104032, 10.02819525, 6.1183447 , - 20.10067378, 10.32313075, 14.090119 , 21.43078927, 2.76624332, - 10.84089416, 1.90722629, 11.19750641, 13.70925639, 5.61011921, - 4.50072694, 13.79440311, 13.30173747, 7.07183589, 12.79853139] -# fmt: on - -# These values are tricky for the mean method, as the anomaly has a big effect on the mean -input_values = [1, 2, 3, 4, 20] -expected_values = [1, 2, 3, 4] - - -def test_filter_with_mean(spark_session: SparkSession): - # Test with normal data - normal_input_df = spark_session.createDataFrame( - [(float(num),) for num in normal_input_values], schema=["value"] - ) - normal_expected_df = spark_session.createDataFrame( - [(float(num),) for num in normal_expected_values], schema=["value"] - ) - - normal_filtered_df = KSigmaAnomalyDetection( - spark_session, - normal_input_df, - column_names=["value"], - k_value=3, - use_median=False, - ).filter_anomalies() - - assert normal_expected_df.collect() == normal_filtered_df.collect() - - # Test with data that has an anomaly that shifts the mean significantly - input_df = spark_session.createDataFrame( - [(float(num),) for num in input_values], schema=["value"] - ) - expected_df = spark_session.createDataFrame( - [(float(num),) for num in expected_values], schema=["value"] - ) - - filtered_df = KSigmaAnomalyDetection( - spark_session, input_df, column_names=["value"], k_value=3, use_median=False - ).filter_anomalies() - - assert expected_df.collect() != filtered_df.collect() - - -def test_filter_with_median(spark_session: SparkSession): - # Test with normal data - normal_input_df = spark_session.createDataFrame( - [(float(num),) for num in normal_input_values], schema=["value"] - ) - normal_expected_df = spark_session.createDataFrame( - [(float(num),) for num in normal_expected_values], schema=["value"] - ) - - normal_filtered_df = KSigmaAnomalyDetection( - spark_session, - normal_input_df, - column_names=["value"], - k_value=3, - use_median=True, - ).filter_anomalies() - - normal_expected_df.show() - normal_filtered_df.show() - - assert normal_expected_df.collect() == normal_filtered_df.collect() - - # Test with data that has an anomaly that shifts the mean significantly - input_df = spark_session.createDataFrame( - [(float(num),) for num in input_values], schema=["value"] - ) - expected_df = spark_session.createDataFrame( - [(float(num),) for num in expected_values], schema=["value"] - ) - - filtered_df = KSigmaAnomalyDetection( - spark_session, input_df, column_names=["value"], k_value=3, use_median=True - ).filter_anomalies() - - assert expected_df.collect() == filtered_df.collect()