Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipeline Demo Notebooks #117

Draft
wants to merge 35 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
67fc1af
Adds Notebook for Normalization
Timm638 Nov 18, 2024
1a9efa7
Extends notebook by Duplicate Detection
Timm638 Nov 18, 2024
6143796
Adds K Sigma Filtering as Test
Timm638 Nov 19, 2024
3178f7a
Finished up plot of notebook
Timm638 Nov 19, 2024
7a0559f
Merge branch 'develop' into feature/043_rtdip_demo
Timm638 Nov 24, 2024
61e161a
Extends notebook & fixes bug in Value Imputation
Timm638 Nov 25, 2024
2d98588
Adds notes to the notebook
Timm638 Nov 25, 2024
8e2ab84
Applies black to code
Timm638 Nov 25, 2024
f0d1aa7
Merge branch 'develop' into feature/096_rtdip_demo_pipeline
Timm638 Jan 7, 2025
97ac7c7
fixes imports by re-adding them to each data quality component
Timm638 Jan 7, 2025
26be410
fixes imports by re-adding them to each data quality component
Timm638 Jan 7, 2025
9867d5f
Merge remote-tracking branch 'origin/feature/096_rtdip_demo_pipeline'…
Timm638 Jan 7, 2025
92b93ce
renamed imports
Timm638 Jan 7, 2025
0f3e866
Applies black to files
Timm638 Jan 7, 2025
6499181
fixes imports & writes down pipeline concept
Timm638 Jan 7, 2025
4941426
updates showcase_notebook to current version of rtdip
Timm638 Jan 14, 2025
6763442
starts working on pipeline_showcase.ipynb
Timm638 Jan 14, 2025
cf0c3cf
saves in progress pipeline_showcase changes
Timm638 Jan 20, 2025
b8eb445
adds Sensor Data Stage
Timm638 Jan 20, 2025
f0b52f3
fixes copy bug in ARIMA
Timm638 Jan 20, 2025
332c181
push latest pipeline changes
Timm638 Jan 20, 2025
e3aafa3
completes draft of notebook
Timm638 Jan 21, 2025
fbe7155
changed dataset presented to be a more periodic one
Timm638 Jan 21, 2025
ed0a8bf
cleaned up draft more, each cell except for Linear Regression functions
Timm638 Jan 21, 2025
542dd96
adjusts plot to be more descriptive
Timm638 Jan 22, 2025
ec394c9
adds Example Data to git
Timm638 Jan 22, 2025
697b02e
finishes up texts
Timm638 Jan 22, 2025
ae07063
updates nbformat
Timm638 Jan 27, 2025
7bc67ec
Merge branch 'develop' into feature/096_rtdip_demo_pipeline
Timm638 Jan 27, 2025
f17b8b9
fixed imports
Timm638 Jan 27, 2025
603df22
removes redundant .gitignore
Timm638 Jan 27, 2025
44a1995
applies black
Timm638 Jan 27, 2025
919d603
fixes missing preparation from pandas DF to spark DF
Timm638 Jan 27, 2025
2e6b496
reapplies black
Timm638 Jan 28, 2025
fc5f26c
updates imports to new ones
Timm638 Jan 29, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -139,4 +139,4 @@ spark-checkpoints/
config.share

# JetBrains
.idea/
.idea/
35,141 changes: 35,141 additions & 0 deletions notebook/Actual Generation per Production Type_2024-2025.csv

Large diffs are not rendered by default.

Binary file added notebook/ExampleData.pkl
Binary file not shown.
388 changes: 388 additions & 0 deletions notebook/pipeline_showcase.ipynb

Large diffs are not rendered by default.

787 changes: 787 additions & 0 deletions notebook/showcase_notebook.ipynb

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,3 @@
from .missing_value_imputation import MissingValueImputation
from .out_of_range_value_filter import OutOfRangeValueFilter
from .flatline_filter import FlatlineFilter
from .missing_value_imputation import MissingValueImputation
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,8 @@ def filter(self) -> PySparkDataFrame:
Imputate missing values based on [Spline Interpolation, ]
"""
if not all(
col_ in self.df.columns for col_ in ["TagName", "EventTime", "Value"]
col_ in self.df.columns
for col_ in ["TagName", "EventTime", "Value", "Status"]
):
raise ValueError("Columns not as expected")

Expand Down
7 changes: 2 additions & 5 deletions src/sdk/python/rtdip_sdk/pipelines/forecasting/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2022 RTDIP
# Copyright 2025 RTDIP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -12,7 +12,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from .spark.linear_regression import *
from .spark.arima import *
from .spark.auto_arima import *
from .spark.data_binning import *
from .spark import *
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from .data_binning import DataBinning
from .linear_regression import LinearRegression
from .arima import ArimaPrediction
from .auto_arima import ArimaAutoPrediction
30 changes: 16 additions & 14 deletions src/sdk/python/rtdip_sdk/pipelines/forecasting/spark/arima.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
import statistics
from enum import Enum
from typing import List, Tuple
Expand Down Expand Up @@ -264,8 +265,7 @@ def _constructor_handle_input_metadata(
if past_data_style is not None:
return past_data_style, value_name, timestamp_name, source_name, status_name
# Automatic calculation part
schema = past_data.schema
schema_names = schema.names
schema_names = past_data.schema.names.copy()

assumed_past_data_style = None
value_name = None
Expand All @@ -279,20 +279,16 @@ def pickout_column(
rgx = regex.compile(regex_string)
sus_columns = list(filter(rgx.search, rem_columns))
found_column = sus_columns[0] if len(sus_columns) == 1 else None
if found_column is not None:
rem_columns.remove(found_column)
return found_column, rem_columns
return found_column

# Is there a status column?
status_name, remaining_columns = pickout_column(schema_names, r"(?i)status")
status_name = pickout_column(schema_names, r"(?i)status")
# Is there a source name / tag
source_name, remaining_columns = pickout_column(schema_names, r"(?i)tag")
source_name = pickout_column(schema_names, r"(?i)tag")
# Is there a timestamp column?
timestamp_name, remaining_columns = pickout_column(
schema_names, r"(?i)time|index"
)
timestamp_name = pickout_column(schema_names, r"(?i)time|index")
# Is there a value column?
value_name, remaining_columns = pickout_column(schema_names, r"(?i)value")
value_name = pickout_column(schema_names, r"(?i)value")

if source_name is not None:
assumed_past_data_style = self.InputStyle.SOURCE_BASED
Expand Down Expand Up @@ -400,7 +396,7 @@ def filter(self) -> PySparkDataFrame:
# Workaround needed for PySpark versions <3.4
pd_df = _prepare_pandas_to_convert_to_spark(pd_df)
predicted_source_pyspark_dataframe = self.spark_session.createDataFrame(
pd_df, schema=self.past_data.schema
pd_df, schema=copy.deepcopy(self.past_data.schema)
)
return predicted_source_pyspark_dataframe
elif self.past_data_style == self.InputStyle.SOURCE_BASED:
Expand Down Expand Up @@ -428,7 +424,12 @@ def filter(self) -> PySparkDataFrame:
data_to_add = _prepare_pandas_to_convert_to_spark(data_to_add)

predicted_source_pyspark_dataframe = self.spark_session.createDataFrame(
data_to_add, schema=pd_df_schema
_prepare_pandas_to_convert_to_spark(
data_to_add[
[self.source_name, self.timestamp_name, self.value_name]
]
),
schema=pd_df_schema,
)

if self.status_name is not None:
Expand All @@ -438,7 +439,8 @@ def filter(self) -> PySparkDataFrame:
)
)

return self.past_data.union(predicted_source_pyspark_dataframe)
to_return = self.past_data.unionByName(predicted_source_pyspark_dataframe)
return to_return

def validate(self, schema_dict):
return super().validate(schema_dict, self.past_data)
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

import pyspark.ml.clustering as clustering
from pyspark.sql import DataFrame
from src.sdk.python.rtdip_sdk.pipelines.forecasting import (
from src.sdk.python.rtdip_sdk.pipelines.forecasting.interfaces import (
MachineLearningInterface,
)
from ..._pipeline_utils.models import Libraries, SystemType
Expand Down