-
Notifications
You must be signed in to change notification settings - Fork 32
/
Copy pathmodel.py
113 lines (95 loc) · 4.29 KB
/
model.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
import warnings
import joblib
import numpy as np
from typing import Optional
from pathlib import Path
from sklearn.base import BaseEstimator, TransformerMixin, RegressorMixin
from sklearn.ensemble import RandomForestRegressor
from sklearn.preprocessing import StandardScaler
from sklearn.metrics.pairwise import paired_distances
from numpy.lib.stride_tricks import sliding_window_view
class SlidingWindowProcessor(BaseEstimator, TransformerMixin):
def __init__(self, window_size: int, standardize: bool = False):
self.window_size = window_size
if standardize:
self.scaler = StandardScaler()
else:
self.scaler = None
def fit(self, X: np.ndarray, y: Optional[np.ndarray] = None, **fit_params) -> 'SlidingWindowProcessor':
if self.scaler:
self.scaler.fit(X)
return self
def transform(self, X: np.ndarray, y: Optional[np.ndarray] = None) -> np.ndarray:
"""
y is unused (exists for compatibility)
"""
if self.scaler:
print("Standardizing input data")
X = self.scaler.transform(X)
X = X.reshape(-1)
# the last window would have no target to predict, e.g. for n=10: [[1, 2] -> 3, ..., [8, 9] -> 10, [9, 10] -> ?]
new_X = sliding_window_view(X, window_shape=(self.window_size))[:-1]
new_y = np.roll(X, -self.window_size)[:-self.window_size]
return new_X, new_y
def transform_y(self, X: np.ndarray) -> np.ndarray:
if self.scaler:
print("Standardizing input data")
X = self.scaler.transform(X)
return np.roll(X, -self.window_size)[:-self.window_size]
def inverse_transform_y(self, y: np.ndarray, skip_inverse_scaling: bool = False) -> np.ndarray:
result = np.full(shape=self.window_size+len(y), fill_value=np.nan)
result[-len(y):] = y
if not skip_inverse_scaling and self.scaler:
print("Reversing standardization for prediction")
result = self.scaler.inverse_transform(result)
return result
class RandomForestAnomalyDetector(BaseEstimator, RegressorMixin):
def __init__(self,
train_window_size: int = 50,
n_trees: int = 100,
max_features_method: str = "auto", # "sqrt", "log2"
bootstrap: bool = True,
max_samples: Optional[float] = None, # fraction of all samples
standardize: bool = False,
random_state: int = 42,
verbose: int = 0,
n_jobs: int = 1,
# the following parameters control the tree size
max_depth: Optional[int] = None,
min_samples_split: int = 2,
min_samples_leaf: int = 1):
self.preprocessor = SlidingWindowProcessor(train_window_size, standardize)
self.clf = RandomForestRegressor(
n_estimators=n_trees,
max_features=max_features_method,
bootstrap=bootstrap,
max_samples=max_samples,
random_state=random_state,
verbose=verbose,
n_jobs=n_jobs,
max_depth=max_depth,
min_samples_split=min_samples_split,
min_samples_leaf=min_samples_leaf,
)
def fit(self, X: np.ndarray, y: np.ndarray = None) -> 'RandomForestAnomalyDetector':
if y is not None:
warnings.warn(f"y is calculated from X. Please don't pass y to RandomForestAnomalyDetector.fit, it will be ignored!")
X, y = self.preprocessor.fit_transform(X)
self.clf.fit(X, y)
return self
def predict(self, X: np.ndarray) -> np.ndarray:
X, _ = self.preprocessor.transform(X)
y_hat = self._predict_internal(X)
return self.preprocessor.inverse_transform_y(y_hat)
def detect(self, X: np.ndarray) -> np.ndarray:
X, y = self.preprocessor.transform(X)
y_hat = self._predict_internal(X)
scores = paired_distances(y.reshape(-1, 1), y_hat.reshape(-1, 1)).reshape(-1)
return self.preprocessor.inverse_transform_y(scores, skip_inverse_scaling=True)
def _predict_internal(self, X: np.ndarray) -> np.ndarray:
return self.clf.predict(X)
def save(self, path: Path) -> None:
joblib.dump(self, path)
@staticmethod
def load(path: Path) -> 'RandomForestAnomalyDetector':
return joblib.load(path)