forked from rtdip/core
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge branch 'develop' of https://github.com/amosproj/amos2024ws01-rt…
…dip-data-quality-checker into develop
- Loading branch information
Showing
9 changed files
with
7,871 additions
and
9 deletions.
There are no files selected for viewing
7,619 changes: 7,619 additions & 0 deletions
7,619
Deliverables/documentation/Flatline Filter - Real Time Data Ingestion Platform.htm
Large diffs are not rendered by default.
Oops, something went wrong.
1 change: 1 addition & 0 deletions
1
...de-reference/pipelines/data_quality/data_manipulation/spark/prediction/arima.md
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
::: src.sdk.python.rtdip_sdk.pipelines.data_quality.data_manipulation.spark.prediction.arima |
1 change: 1 addition & 0 deletions
1
...ference/pipelines/data_quality/data_manipulation/spark/prediction/auto_arima.md
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
::: src.sdk.python.rtdip_sdk.pipelines.data_quality.data_manipulation.spark.prediction.auto_arima |
1 change: 1 addition & 0 deletions
1
docs/sdk/code-reference/pipelines/data_quality/monitoring/spark/moving_average.md
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
::: src.sdk.python.rtdip_sdk.pipelines.data_quality.monitoring.spark.moving_average |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
135 changes: 135 additions & 0 deletions
135
src/sdk/python/rtdip_sdk/pipelines/data_quality/monitoring/spark/moving_average.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,135 @@ | ||
import logging | ||
from pyspark.sql import DataFrame as PySparkDataFrame | ||
from pyspark.sql.functions import col, avg | ||
from pyspark.sql.window import Window | ||
from pyspark.sql.types import ( | ||
StructType, | ||
StructField, | ||
StringType, | ||
TimestampType, | ||
FloatType, | ||
) | ||
|
||
from src.sdk.python.rtdip_sdk.pipelines.data_quality.monitoring.interfaces import ( | ||
MonitoringBaseInterface, | ||
) | ||
from src.sdk.python.rtdip_sdk.pipelines._pipeline_utils.models import ( | ||
Libraries, | ||
SystemType, | ||
) | ||
from ...input_validator import InputValidator | ||
|
||
|
||
class MovingAverage(MonitoringBaseInterface, InputValidator): | ||
""" | ||
Computes and logs the moving average over a specified window size for a given PySpark DataFrame. | ||
Args: | ||
df (pyspark.sql.DataFrame): The DataFrame to process. | ||
window_size (int): The size of the moving window. | ||
Example: | ||
```python | ||
from pyspark.sql import SparkSession | ||
from rtdip_sdk.pipelines.monitoring.spark.data_quality.moving_average import MovingAverage | ||
spark = SparkSession.builder.master("local[1]").appName("MovingAverageExample").getOrCreate() | ||
data = [ | ||
("A2PS64V0J.:ZUX09R", "2024-01-02 03:49:45.000", "Good", 1.0), | ||
("A2PS64V0J.:ZUX09R", "2024-01-02 07:53:11.000", "Good", 2.0), | ||
("A2PS64V0J.:ZUX09R", "2024-01-02 11:56:42.000", "Good", 3.0), | ||
("A2PS64V0J.:ZUX09R", "2024-01-02 16:00:12.000", "Good", 4.0), | ||
("A2PS64V0J.:ZUX09R", "2024-01-02 20:03:46.000", "Good", 5.0), | ||
] | ||
columns = ["TagName", "EventTime", "Status", "Value"] | ||
df = spark.createDataFrame(data, columns) | ||
moving_avg = MovingAverage( | ||
df=df, | ||
window_size=3, | ||
) | ||
moving_avg.check() | ||
``` | ||
""" | ||
|
||
df: PySparkDataFrame | ||
window_size: int | ||
EXPECTED_SCHEMA = StructType( | ||
[ | ||
StructField("TagName", StringType(), True), | ||
StructField("EventTime", TimestampType(), True), | ||
StructField("Status", StringType(), True), | ||
StructField("Value", FloatType(), True), | ||
] | ||
) | ||
|
||
def __init__( | ||
self, | ||
df: PySparkDataFrame, | ||
window_size: int, | ||
) -> None: | ||
if not isinstance(window_size, int) or window_size <= 0: | ||
raise ValueError("window_size must be a positive integer.") | ||
|
||
self.df = df | ||
self.validate(self.EXPECTED_SCHEMA) | ||
self.window_size = window_size | ||
|
||
self.logger = logging.getLogger(self.__class__.__name__) | ||
if not self.logger.handlers: | ||
handler = logging.StreamHandler() | ||
formatter = logging.Formatter( | ||
"%(asctime)s - %(name)s - %(levelname)s - %(message)s" | ||
) | ||
handler.setFormatter(formatter) | ||
self.logger.addHandler(handler) | ||
self.logger.setLevel(logging.INFO) | ||
|
||
@staticmethod | ||
def system_type(): | ||
""" | ||
Attributes: | ||
SystemType (Environment): Requires PYSPARK | ||
""" | ||
return SystemType.PYSPARK | ||
|
||
@staticmethod | ||
def libraries(): | ||
libraries = Libraries() | ||
return libraries | ||
|
||
@staticmethod | ||
def settings() -> dict: | ||
return {} | ||
|
||
def check(self) -> None: | ||
""" | ||
Computes and logs the moving average using a specified window size. | ||
""" | ||
|
||
self._validate_inputs() | ||
|
||
window_spec = ( | ||
Window.partitionBy("TagName") | ||
.orderBy("EventTime") | ||
.rowsBetween(-(self.window_size - 1), 0) | ||
) | ||
|
||
self.logger.info("Computing moving averages:") | ||
|
||
for row in ( | ||
self.df.withColumn("MovingAverage", avg(col("Value")).over(window_spec)) | ||
.select("TagName", "EventTime", "Value", "MovingAverage") | ||
.collect() | ||
): | ||
self.logger.info( | ||
f"Tag: {row.TagName}, Time: {row.EventTime}, Value: {row.Value}, Moving Avg: {row.MovingAverage}" | ||
) | ||
|
||
def _validate_inputs(self): | ||
if not isinstance(self.window_size, int) or self.window_size <= 0: | ||
raise ValueError("window_size must be a positive integer.") |
91 changes: 91 additions & 0 deletions
91
tests/sdk/python/rtdip_sdk/pipelines/data_quality/monitoring/spark/test_moving_average.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,91 @@ | ||
import pytest | ||
import os | ||
from pyspark.sql import SparkSession | ||
from src.sdk.python.rtdip_sdk.pipelines.data_quality.monitoring.spark.moving_average import ( | ||
MovingAverage, | ||
) | ||
import logging | ||
from io import StringIO | ||
|
||
|
||
@pytest.fixture(scope="session") | ||
def spark(): | ||
spark = ( | ||
SparkSession.builder.master("local[2]") | ||
.appName("MovingAverageTest") | ||
.getOrCreate() | ||
) | ||
yield spark | ||
spark.stop() | ||
|
||
|
||
@pytest.fixture | ||
def log_capture(): | ||
log_stream = StringIO() | ||
logger = logging.getLogger("MovingAverage") | ||
logger.setLevel(logging.INFO) | ||
handler = logging.StreamHandler(log_stream) | ||
formatter = logging.Formatter("%(message)s") | ||
handler.setFormatter(formatter) | ||
logger.addHandler(handler) | ||
yield log_stream | ||
logger.removeHandler(handler) | ||
handler.close() | ||
|
||
|
||
def test_moving_average_basic(spark, log_capture): | ||
df = spark.createDataFrame( | ||
[ | ||
("Tag1", "2024-01-02 03:49:45.000", "Good", 1.0), | ||
("Tag1", "2024-01-02 07:53:11.000", "Good", 2.0), | ||
("Tag1", "2024-01-02 11:56:42.000", "Good", 3.0), | ||
("Tag1", "2024-01-02 16:00:12.000", "Good", 4.0), | ||
("Tag1", "2024-01-02 20:03:46.000", "Good", 5.0), | ||
], | ||
["TagName", "EventTime", "Status", "Value"], | ||
) | ||
|
||
detector = MovingAverage(df, window_size=3) | ||
detector.check() | ||
|
||
expected_logs = [ | ||
"Computing moving averages:", | ||
"Tag: Tag1, Time: 2024-01-02 03:49:45, Value: 1.0, Moving Avg: 1.0", | ||
"Tag: Tag1, Time: 2024-01-02 07:53:11, Value: 2.0, Moving Avg: 1.5", | ||
"Tag: Tag1, Time: 2024-01-02 11:56:42, Value: 3.0, Moving Avg: 2.0", | ||
"Tag: Tag1, Time: 2024-01-02 16:00:12, Value: 4.0, Moving Avg: 3.0", | ||
"Tag: Tag1, Time: 2024-01-02 20:03:46, Value: 5.0, Moving Avg: 4.0", | ||
] | ||
|
||
actual_logs = log_capture.getvalue().strip().split("\n") | ||
|
||
assert len(expected_logs) == len( | ||
actual_logs | ||
), f"Expected {len(expected_logs)} logs, got {len(actual_logs)}" | ||
|
||
for expected, actual in zip(expected_logs, actual_logs): | ||
assert expected in actual, f"Expected: '{expected}', got: '{actual}'" | ||
|
||
|
||
def test_moving_average_invalid_window_size(spark): | ||
df = spark.createDataFrame( | ||
[ | ||
("Tag1", "2024-01-02 03:49:45.000", "Good", 1.0), | ||
("Tag1", "2024-01-02 07:53:11.000", "Good", 2.0), | ||
], | ||
["TagName", "EventTime", "Status", "Value"], | ||
) | ||
|
||
with pytest.raises(ValueError, match="window_size must be a positive integer."): | ||
MovingAverage(df, window_size=-2) | ||
|
||
|
||
def test_large_dataset(spark): | ||
base_path = os.path.dirname(__file__) | ||
file_path = os.path.join(base_path, "../../test_data.csv") | ||
df = spark.read.option("header", "true").csv(file_path) | ||
|
||
assert df.count() > 0, "DataFrame was nicht geladen." | ||
|
||
detector = MovingAverage(df, window_size=5) | ||
detector.check() |