diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 60867aa..28e2984 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -9,7 +9,7 @@ on: defaults: run: - shell: bash -l {0} + shell: bash concurrency: group: ${{ github.workflow }}-${{ github.ref }} @@ -27,7 +27,7 @@ jobs: platform-id: manylinux_x86_64 - os: windows-2019 platform-id: win_amd64 - - os: macos-latest + - os: macos-11 platform-id: macosx_x86_64 - os: macos-latest platform-id: macosx_arm64 @@ -41,6 +41,11 @@ jobs: CIBW_BUILD: cp38-${{ matrix.platform-id }} CIBW_ARCHS: all CIBW_TEST_SKIP: '*-macosx_arm64' + CIBW_BUILD_VERBOSITY: 3 + CIBW_BEFORE_BUILD_MACOS: | + sudo xcode-select -s /Applications/Xcode_11.7.app/Contents/Developer + pip install toml + python scripts/disable_omp_arm64.py - uses: actions/upload-artifact@v3 with: @@ -59,12 +64,13 @@ jobs: - name: Clone repo uses: actions/checkout@v3 - - name: Set up environment - uses: mamba-org/setup-micromamba@v1 + - uses: actions/setup-python@v4 with: - environment-file: environment.yml - create-args: python=${{ matrix.python-version }} - cache-environment: true + python-version: ${{ matrix.python-version }} + cache: pip + + - name: Install dependencies + run: pip install -r requirements-test.txt - name: Download wheels uses: actions/download-artifact@v3 diff --git a/.gitignore b/.gitignore index 4186e1e..4d5cbb2 100644 --- a/.gitignore +++ b/.gitignore @@ -15,6 +15,7 @@ *.so *.dylib *.dll +*.exp # Fortran module files *.mod diff --git a/CMakeLists.txt b/CMakeLists.txt index 0e159b7..9a792da 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,11 +1,32 @@ cmake_minimum_required(VERSION 3.25) project(coreforecast) +option(USE_OPENMP "Enable OpenMP" ON) + if(NOT CMAKE_BUILD_TYPE) set(CMAKE_BUILD_TYPE Release) endif() set(CMAKE_CXX_STANDARD 17) +if(USE_OPENMP) + if(APPLE) + find_package(OpenMP) + if(NOT OpenMP_FOUND) + # libomp 15.0+ from brew is keg-only, so have to search in other locations. + # See https://github.com/Homebrew/homebrew-core/issues/112107#issuecomment-1278042927. + execute_process(COMMAND brew --prefix libomp + OUTPUT_VARIABLE HOMEBREW_LIBOMP_PREFIX + OUTPUT_STRIP_TRAILING_WHITESPACE) + set(OpenMP_C_FLAGS "-Xpreprocessor -fopenmp -I${HOMEBREW_LIBOMP_PREFIX}/include") + set(OpenMP_CXX_FLAGS "-Xpreprocessor -fopenmp -I${HOMEBREW_LIBOMP_PREFIX}/include") + set(OpenMP_C_LIB_NAMES omp) + set(OpenMP_CXX_LIB_NAMES omp) + set(OpenMP_omp_LIBRARY ${HOMEBREW_LIBOMP_PREFIX}/lib/libomp.dylib) + endif() + endif() + find_package(OpenMP REQUIRED) + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${OpenMP_CXX_FLAGS}") +endif() if(APPLE) set(CMAKE_SHARED_LIBRARY_SUFFIX ".so") @@ -14,10 +35,9 @@ endif() if(UNIX) set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -fPIC -O3 -Wall -Wextra -Wpedantic") else() - set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} /O2 /Ot /Oy /W4") + set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} /O2 /Ob2 /Ot /Oy /W4") endif() - if(SKBUILD) set(LIBRARY_OUTPUT_PATH ${SKBUILD_PLATLIB_DIR}/coreforecast/lib) else() @@ -29,3 +49,7 @@ add_library(coreforecast SHARED src/coreforecast.cpp) if(MSVC) set_target_properties(coreforecast PROPERTIES OUTPUT_NAME "libcoreforecast") endif() + +if(USE_OPENMP AND CMAKE_CXX_COMPILER_ID STREQUAL "AppleClang") + target_link_libraries(coreforecast PUBLIC OpenMP::OpenMP_CXX) +endif() diff --git a/coreforecast/grouped_array.py b/coreforecast/grouped_array.py index 84421fd..a1d2016 100644 --- a/coreforecast/grouped_array.py +++ b/coreforecast/grouped_array.py @@ -10,9 +10,6 @@ from importlib.resources import files -DTYPE_FLOAT32 = ctypes.c_int(0) -DTYPE_FLOAT64 = ctypes.c_int(1) - if platform.system() in ("Windows", "Microsoft"): prefix = "Release" extension = "dll" @@ -30,30 +27,31 @@ def _data_as_void_ptr(arr: np.ndarray): class GroupedArray: - def __init__(self, data: np.ndarray, indptr: np.ndarray): + def __init__(self, data: np.ndarray, indptr: np.ndarray, num_threads: int = 1): + data = np.ascontiguousarray(data, dtype=data.dtype) if data.dtype == np.float32: - self.dtype = DTYPE_FLOAT32 + self.prefix = "GroupedArrayFloat32" elif data.dtype == np.float64: - self.dtype = DTYPE_FLOAT64 + self.prefix = "GroupedArrayFloat64" else: - self.dtype = DTYPE_FLOAT32 + self.prefix = "GroupedArrayFloat32" data = data.astype(np.float32) self.data = data if indptr.dtype != np.int32: indptr = indptr.astype(np.int32) self.indptr = indptr self._handle = ctypes.c_void_p() - _LIB.GroupedArray_CreateFromArrays( + _LIB[f"{self.prefix}_Create"]( _data_as_void_ptr(data), ctypes.c_int32(data.size), indptr.ctypes.data_as(ctypes.POINTER(ctypes.c_int32)), - ctypes.c_int32(indptr.size), - self.dtype, + ctypes.c_int(indptr.size), + ctypes.c_int(num_threads), ctypes.byref(self._handle), ) def __del__(self): - _LIB.GroupedArray_Delete(self._handle, self.dtype) + _LIB[f"{self.prefix}_Delete"](self._handle) def __len__(self): return self.indptr.size - 1 @@ -61,32 +59,157 @@ def __len__(self): def __getitem__(self, i): return self.data[self.indptr[i] : self.indptr[i + 1]] - def scaler_fit(self, stats_fn_name: str) -> np.ndarray: - stats = np.full((len(self), 2), np.nan, dtype=self.data.dtype) - stats_fn = _LIB[stats_fn_name] - stats_fn( + def scaler_fit(self, scaler_type: str) -> np.ndarray: + stats = np.full_like(self.data, np.nan, shape=(len(self), 2)) + _LIB[f"{self.prefix}_{scaler_type}ScalerStats"]( self._handle, - self.dtype, _data_as_void_ptr(stats), ) return stats def scaler_transform(self, stats: np.ndarray) -> np.ndarray: out = np.full_like(self.data, np.nan) - _LIB.GroupedArray_ScalerTransform( + _LIB[f"{self.prefix}_ScalerTransform"]( self._handle, _data_as_void_ptr(stats), - self.dtype, _data_as_void_ptr(out), ) return out def scaler_inverse_transform(self, stats: np.ndarray) -> np.ndarray: out = np.empty_like(self.data) - _LIB.GroupedArray_ScalerInverseTransform( + _LIB[f"{self.prefix}_ScalerInverseTransform"]( self._handle, _data_as_void_ptr(stats), - self.dtype, + _data_as_void_ptr(out), + ) + return out + + def take_from_groups(self, k: int) -> np.ndarray: + out = np.empty_like(self.data, shape=len(self)) + _LIB[f"{self.prefix}_TakeFromGroups"]( + self._handle, + ctypes.c_int(k), + _data_as_void_ptr(out), + ) + return out + + def lag_transform(self, lag: int) -> np.ndarray: + out = np.full_like(self.data, np.nan) + _LIB[f"{self.prefix}_LagTransform"]( + self._handle, + ctypes.c_int(lag), + _data_as_void_ptr(out), + ) + return out + + def rolling_transform( + self, stat_name: str, lag: int, window_size: int, min_samples: int + ) -> np.ndarray: + out = np.full_like(self.data, np.nan) + _LIB[f"{self.prefix}_Rolling{stat_name}Transform"]( + self._handle, + ctypes.c_int(lag), + ctypes.c_int(window_size), + ctypes.c_int(min_samples), + _data_as_void_ptr(out), + ) + return out + + def rolling_update( + self, stat_name: str, lag: int, window_size: int, min_samples: int + ) -> np.ndarray: + out = np.empty_like(self.data, shape=len(self)) + _LIB[f"{self.prefix}_Rolling{stat_name}Update"]( + self._handle, + ctypes.c_int(lag), + ctypes.c_int(window_size), + ctypes.c_int(min_samples), + _data_as_void_ptr(out), + ) + return out + + def seasonal_rolling_transform( + self, + stat_name: str, + lag: int, + season_length: int, + window_size: int, + min_samples: int, + ) -> np.ndarray: + out = np.full_like(self.data, np.nan) + _LIB[f"{self.prefix}_SeasonalRolling{stat_name}Transform"]( + self._handle, + ctypes.c_int(lag), + ctypes.c_int(season_length), + ctypes.c_int(window_size), + ctypes.c_int(min_samples), + _data_as_void_ptr(out), + ) + return out + + def seasonal_rolling_update( + self, + stat_name: str, + lag: int, + season_length: int, + window_size: int, + min_samples: int, + ) -> np.ndarray: + out = np.empty_like(self.data, shape=len(self)) + _LIB[f"{self.prefix}_SeasonalRolling{stat_name}Update"]( + self._handle, + ctypes.c_int(lag), + ctypes.c_int(season_length), + ctypes.c_int(window_size), + ctypes.c_int(min_samples), + _data_as_void_ptr(out), + ) + return out + + def expanding_transform_with_aggs( + self, + stat_name: str, + lag: int, + aggs: np.ndarray, + ) -> np.ndarray: + out = np.full_like(self.data, np.nan) + _LIB[f"{self.prefix}_Expanding{stat_name}Transform"]( + self._handle, + ctypes.c_int(lag), + _data_as_void_ptr(out), + _data_as_void_ptr(aggs), + ) + return out + + def expanding_transform( + self, + stat_name: str, + lag: int, + ) -> np.ndarray: + out = np.full_like(self.data, np.nan) + _LIB[f"{self.prefix}_Expanding{stat_name}Transform"]( + self._handle, + ctypes.c_int(lag), + _data_as_void_ptr(out), + ) + return out + + def exponentially_weighted_transform( + self, + stat_name: str, + lag: int, + alpha: float, + ) -> np.ndarray: + out = np.full_like(self.data, np.nan) + if self.prefix == "GroupedArrayFloat32": + alpha = ctypes.c_float(alpha) + else: + alpha = ctypes.c_double(alpha) + _LIB[f"{self.prefix}_ExponentiallyWeighted{stat_name}Transform"]( + self._handle, + ctypes.c_int(lag), + alpha, _data_as_void_ptr(out), ) return out diff --git a/coreforecast/lag_transforms.py b/coreforecast/lag_transforms.py new file mode 100644 index 0000000..33c5ebf --- /dev/null +++ b/coreforecast/lag_transforms.py @@ -0,0 +1,209 @@ +__all__ = [ + "Lag", + "RollingMean", + "RollingStd", + "RollingMin", + "RollingMax", + "SeasonalRollingMean", + "SeasonalRollingStd", + "SeasonalRollingMin", + "SeasonalRollingMax", + "ExpandingMean", + "ExpandingStd", + "ExpandingMin", + "ExpandingMax", + "ExponentiallyWeightedMean", +] + +import abc +from typing import Callable, Optional + +import numpy as np + +from .grouped_array import GroupedArray + + +class BaseLagTransform(abc.ABC): + @abc.abstractmethod + def transform(self, ga: GroupedArray) -> np.ndarray: + ... + + @abc.abstractmethod + def update(self, ga: GroupedArray) -> np.ndarray: + ... + + +class Lag(BaseLagTransform): + def __init__(self, lag: int): + self.lag = lag + + def transform(self, ga: GroupedArray) -> np.ndarray: + return ga.lag_transform(self.lag) + + def update(self, ga: GroupedArray) -> np.ndarray: + return ga.take_from_groups(self.lag - 1) + + +class RollingBase(BaseLagTransform): + stat_name: str + lag: int + window_size: int + min_samples: int + + def __init__(self, lag: int, window_size: int, min_samples: Optional[int] = None): + self.lag = lag + if min_samples is None: + min_samples = window_size + if min_samples > window_size: + min_samples = window_size + self.window_size = window_size + self.min_samples = min_samples + + def transform(self, ga: GroupedArray) -> np.ndarray: + return ga.rolling_transform( + self.stat_name, self.lag, self.window_size, self.min_samples + ) + + def update(self, ga: GroupedArray) -> np.ndarray: + return ga.rolling_update( + self.stat_name, self.lag - 1, self.window_size, self.min_samples + ) + + +class RollingMean(RollingBase): + stat_name = "Mean" + + +class RollingStd(RollingBase): + stat_name = "Std" + + +class RollingMin(RollingBase): + stat_name = "Min" + + +class RollingMax(RollingBase): + stat_name = "Max" + + +class SeasonalRollingBase(RollingBase): + season_length: int + + def __init__( + self, + lag: int, + season_length: int, + window_size: int, + min_samples: Optional[int] = None, + ): + super().__init__(lag, window_size, min_samples) + self.season_length = season_length + + def transform(self, ga: GroupedArray) -> np.ndarray: + return ga.seasonal_rolling_transform( + self.stat_name, + self.lag, + self.season_length, + self.window_size, + self.min_samples, + ) + + def update(self, ga: GroupedArray) -> np.ndarray: + return ga.seasonal_rolling_update( + self.stat_name, + self.lag - 1, + self.season_length, + self.window_size, + self.min_samples, + ) + + +class SeasonalRollingMean(SeasonalRollingBase): + stat_name = "Mean" + + +class SeasonalRollingStd(SeasonalRollingBase): + stat_name = "Std" + + +class SeasonalRollingMin(SeasonalRollingBase): + stat_name = "Min" + + +class SeasonalRollingMax(SeasonalRollingBase): + stat_name = "Max" + + +class ExpandingBase(BaseLagTransform): + def __init__(self, lag: int): + self.lag = lag + + +class ExpandingMean(ExpandingBase): + def transform(self, ga: GroupedArray) -> np.ndarray: + self.n = np.empty_like(ga.data, shape=len(ga)) + out = ga.expanding_transform_with_aggs("Mean", self.lag, self.n) + self.cumsum = out[ga.indptr[1:] - 1] * self.n + return out + + def update(self, ga: GroupedArray) -> np.ndarray: + self.n += 1 + self.cumsum += ga.take_from_groups(self.lag - 1) + return self.cumsum / self.n + + +class ExpandingStd(ExpandingBase): + def transform(self, ga: GroupedArray) -> np.ndarray: + self.stats = np.empty_like(ga.data, shape=(len(ga), 3)) + out = ga.expanding_transform_with_aggs("Std", self.lag, self.stats) + return out + + def update(self, ga: GroupedArray) -> np.ndarray: + x = ga.take_from_groups(self.lag - 1) + self.stats[:, 0] += 1.0 + n = self.stats[:, 0] + prev_avg = self.stats[:, 1].copy() + self.stats[:, 1] = prev_avg + (x - prev_avg) / n + self.stats[:, 2] += (x - prev_avg) * (x - self.stats[:, 1]) + self.stats[:, 2] = np.maximum(self.stats[:, 2], 0.0) + return np.sqrt(self.stats[:, 2] / (n - 1)) + + +class ExpandingComp(ExpandingBase): + stat: str + comp_fn: Callable + + def transform(self, ga: GroupedArray) -> np.ndarray: + out = ga.expanding_transform(self.stat, self.lag) + self.stats = out[ga.indptr[1:] - 1] + return out + + def update(self, ga: GroupedArray) -> np.ndarray: + self.stats = self.comp_fn(self.stats, ga.take_from_groups(self.lag - 1)) + return self.stats + + +class ExpandingMin(ExpandingComp): + stat = "Min" + comp_fn = np.minimum + + +class ExpandingMax(ExpandingComp): + stat = "Max" + comp_fn = np.maximum + + +class ExponentiallyWeightedMean(BaseLagTransform): + def __init__(self, lag: int, alpha: float): + self.lag = lag + self.alpha = alpha + + def transform(self, ga: GroupedArray) -> np.ndarray: + out = ga.exponentially_weighted_transform("Mean", self.lag, self.alpha) + self.ewm = out[ga.indptr[1:] - 1] + return out + + def update(self, ga: GroupedArray) -> np.ndarray: + x = ga.take_from_groups(self.lag - 1) + self.ewm = self.alpha * x + (1 - self.alpha) * self.ewm + return self.ewm diff --git a/coreforecast/scalers.py b/coreforecast/scalers.py index 987d8ed..76ade3f 100644 --- a/coreforecast/scalers.py +++ b/coreforecast/scalers.py @@ -4,10 +4,10 @@ class BaseLocalScaler: - stats_fn_name: str + scaler_type: str def fit(self, ga: GroupedArray) -> "BaseLocalScaler": - self.stats_ = ga.scaler_fit(self.stats_fn_name) + self.stats_ = ga.scaler_fit(self.scaler_type) return self def transform(self, ga: GroupedArray) -> np.ndarray: @@ -18,16 +18,16 @@ def inverse_transform(self, ga: GroupedArray) -> np.ndarray: class LocalMinMaxScaler(BaseLocalScaler): - stats_fn_name = "GroupedArray_MinMaxScalerStats" + scaler_type = "MinMax" class LocalStandardScaler(BaseLocalScaler): - stats_fn_name = "GroupedArray_StandardScalerStats" + scaler_type = "Standard" class LocalRobustScaler(BaseLocalScaler): def __init__(self, scale: str): if scale == "iqr": - self.stats_fn_name = "GroupedArray_RobustScalerIqrStats" + self.scaler_type = "RobustIqr" else: - self.stats_fn_name = "GroupedArray_RobustScalerMadStats" + self.scaler_type = "RobustMad" diff --git a/dev_environment.yml b/dev_environment.yml index 51c547d..452e500 100644 --- a/dev_environment.yml +++ b/dev_environment.yml @@ -14,3 +14,4 @@ dependencies: - pytest-benchmark - scikit-build-core - utilsforecast>=0.0.7 + - window-ops diff --git a/environment.yml b/environment.yml index 5e85e8e..cbf0e9f 100644 --- a/environment.yml +++ b/environment.yml @@ -10,3 +10,4 @@ dependencies: - pytest-benchmark - scikit-build-core - utilsforecast>=0.0.7 + - window-ops diff --git a/include/coreforecast.h b/include/coreforecast.h index 5d9cd7b..819513d 100644 --- a/include/coreforecast.h +++ b/include/coreforecast.h @@ -8,36 +8,232 @@ #define DLL_EXPORT #endif -typedef void *GroupedArrayHandle; - -#define DTYPE_FLOAT32 (0) -#define DTYPE_FLOAT64 (1) +using GroupedArrayHandle = void *; +using indptr_t = int32_t; extern "C" { -DLL_EXPORT int GroupedArray_CreateFromArrays(const void *data, int32_t n_data, - int32_t *indptr, int32_t n_groups, - int data_type, - GroupedArrayHandle *out); +DLL_EXPORT int GroupedArrayFloat32_Create(const float *data, int32_t n_data, + int32_t *indptr, int32_t n_indptr, + int num_threads, + GroupedArrayHandle *out); +DLL_EXPORT int GroupedArrayFloat64_Create(const double *data, int32_t n_data, + int32_t *indptr, int32_t n_indptr, + int num_threads, + GroupedArrayHandle *out); + +DLL_EXPORT int GroupedArrayFloat32_Delete(GroupedArrayHandle handle); +DLL_EXPORT int GroupedArrayFloat64_Delete(GroupedArrayHandle handle); + +DLL_EXPORT int GroupedArrayFloat32_MinMaxScalerStats(GroupedArrayHandle handle, + float *out); +DLL_EXPORT int GroupedArrayFloat64_MinMaxScalerStats(GroupedArrayHandle handle, + double *out); + +DLL_EXPORT int +GroupedArrayFloat32_StandardScalerStats(GroupedArrayHandle handle, float *out); +DLL_EXPORT int +GroupedArrayFloat64_StandardScalerStats(GroupedArrayHandle handle, double *out); + +DLL_EXPORT int +GroupedArrayFloat32_RobustIqrScalerStats(GroupedArrayHandle handle, float *out); +DLL_EXPORT int +GroupedArrayFloat64_RobustIqrScalerStats(GroupedArrayHandle handle, + double *out); + +DLL_EXPORT int +GroupedArrayFloat32_RobustMadScalerStats(GroupedArrayHandle handle, float *out); +DLL_EXPORT int +GroupedArrayFloat64_RobustMadScalerStats(GroupedArrayHandle handle, + double *out); + +DLL_EXPORT int GroupedArrayFloat32_ScalerTransform(GroupedArrayHandle handle, + const float *stats, + float *out); +DLL_EXPORT int GroupedArrayFloat64_ScalerTransform(GroupedArrayHandle handle, + const double *stats, + double *out); + +DLL_EXPORT int +GroupedArrayFloat32_ScalerInverseTransform(GroupedArrayHandle handle, + const float *stats, float *out); +DLL_EXPORT int +GroupedArrayFloat64_ScalerInverseTransform(GroupedArrayHandle handle, + const double *stats, double *out); + +DLL_EXPORT int GroupedArrayFloat32_TakeFromGroups(GroupedArrayHandle handle, + int k, float *out); +DLL_EXPORT int GroupedArrayFloat64_TakeFromGroups(GroupedArrayHandle handle, + int k, double *out); + +DLL_EXPORT int GroupedArrayFloat32_LagTransform(GroupedArrayHandle handle, + int lag, float *out); +DLL_EXPORT int GroupedArrayFloat64_LagTransform(GroupedArrayHandle handle, + int lag, double *out); + +DLL_EXPORT int +GroupedArrayFloat32_RollingMeanTransform(GroupedArrayHandle handle, int lag, + int window_size, int min_samples, + float *out); +DLL_EXPORT int +GroupedArrayFloat64_RollingMeanTransform(GroupedArrayHandle handle, int lag, + int window_size, int min_samples, + double *out); + +DLL_EXPORT int +GroupedArrayFloat32_RollingStdTransform(GroupedArrayHandle handle, int lag, + int window_size, int min_samples, + float *out); +DLL_EXPORT int +GroupedArrayFloat64_RollingStdTransform(GroupedArrayHandle handle, int lag, + int window_size, int min_samples, + double *out); + +DLL_EXPORT int +GroupedArrayFloat32_RollingMinTransform(GroupedArrayHandle handle, int lag, + int window_size, int min_samples, + float *out); +DLL_EXPORT int +GroupedArrayFloat64_RollingMinTransform(GroupedArrayHandle handle, int lag, + int window_size, int min_samples, + double *out); + +DLL_EXPORT int +GroupedArrayFloat32_RollingMaxTransform(GroupedArrayHandle handle, int lag, + int window_size, int min_samples, + float *out); +DLL_EXPORT int +GroupedArrayFloat64_RollingMaxTransform(GroupedArrayHandle handle, int lag, + int window_size, int min_samples, + double *out); + +DLL_EXPORT int GroupedArrayFloat32_RollingMeanUpdate(GroupedArrayHandle handle, + int lag, int window_size, + int min_samples, + float *out); +DLL_EXPORT int GroupedArrayFloat64_RollingMeanUpdate(GroupedArrayHandle handle, + int lag, int window_size, + int min_samples, + double *out); + +DLL_EXPORT int GroupedArrayFloat32_RollingStdUpdate(GroupedArrayHandle handle, + int lag, int window_size, + int min_samples, + float *out); +DLL_EXPORT int GroupedArrayFloat64_RollingStdUpdate(GroupedArrayHandle handle, + int lag, int window_size, + int min_samples, + double *out); + +DLL_EXPORT int GroupedArrayFloat32_RollingMinUpdate(GroupedArrayHandle handle, + int lag, int window_size, + int min_samples, + float *out); +DLL_EXPORT int GroupedArrayFloat64_RollingMinUpdate(GroupedArrayHandle handle, + int lag, int window_size, + int min_samples, + double *out); + +DLL_EXPORT int GroupedArrayFloat32_RollingMaxUpdate(GroupedArrayHandle handle, + int lag, int window_size, + int min_samples, + float *out); +DLL_EXPORT int GroupedArrayFloat64_RollingMaxUpdate(GroupedArrayHandle handle, + int lag, int window_size, + int min_samples, + double *out); + +DLL_EXPORT int GroupedArrayFloat32_SeasonalRollingMeanTransform( + GroupedArrayHandle handle, int lag, int season_length, int window_size, + int min_samples, float *out); +DLL_EXPORT int GroupedArrayFloat64_SeasonalRollingMeanTransform( + GroupedArrayHandle handle, int lag, int season_length, int window_size, + int min_samples, double *out); + +DLL_EXPORT int GroupedArrayFloat32_SeasonalRollingStdTransform( + GroupedArrayHandle handle, int lag, int season_length, int window_size, + int min_samples, float *out); +DLL_EXPORT int GroupedArrayFloat64_SeasonalRollingStdTransform( + GroupedArrayHandle handle, int lag, int season_length, int window_size, + int min_samples, double *out); + +DLL_EXPORT int GroupedArrayFloat32_SeasonalRollingMinTransform( + GroupedArrayHandle handle, int lag, int season_length, int window_size, + int min_samples, float *out); +DLL_EXPORT int GroupedArrayFloat64_SeasonalRollingMinTransform( + GroupedArrayHandle handle, int lag, int season_length, int window_size, + int min_samples, double *out); + +DLL_EXPORT int GroupedArrayFloat32_SeasonalRollingMaxTransform( + GroupedArrayHandle handle, int lag, int season_length, int window_size, + int min_samples, float *out); +DLL_EXPORT int GroupedArrayFloat64_SeasonalRollingMaxTransform( + GroupedArrayHandle handle, int lag, int season_length, int window_size, + int min_samples, double *out); + +DLL_EXPORT int GroupedArrayFloat32_SeasonalRollingMeanUpdate( + GroupedArrayHandle handle, int lag, int season_length, int window_size, + int min_samples, float *out); +DLL_EXPORT int GroupedArrayFloat64_SeasonalRollingMeanUpdate( + GroupedArrayHandle handle, int lag, int season_length, int window_size, + int min_samples, double *out); + +DLL_EXPORT int +GroupedArrayFloat32_SeasonalRollingStdUpdate(GroupedArrayHandle handle, int lag, + int season_length, int window_size, + int min_samples, float *out); +DLL_EXPORT int +GroupedArrayFloat64_SeasonalRollingStdUpdate(GroupedArrayHandle handle, int lag, + int season_length, int window_size, + int min_samples, double *out); -DLL_EXPORT int GroupedArray_Delete(GroupedArrayHandle handle, int data_type); +DLL_EXPORT int +GroupedArrayFloat32_SeasonalRollingMinUpdate(GroupedArrayHandle handle, int lag, + int season_length, int window_size, + int min_samples, float *out); +DLL_EXPORT int +GroupedArrayFloat64_SeasonalRollingMinUpdate(GroupedArrayHandle handle, int lag, + int season_length, int window_size, + int min_samples, double *out); -DLL_EXPORT int GroupedArray_MinMaxScalerStats(GroupedArrayHandle handle, - int data_type, void *out); +DLL_EXPORT int +GroupedArrayFloat32_SeasonalRollingMaxUpdate(GroupedArrayHandle handle, int lag, + int season_length, int window_size, + int min_samples, float *out); +DLL_EXPORT int +GroupedArrayFloat64_SeasonalRollingMaxUpdate(GroupedArrayHandle handle, int lag, + int season_length, int window_size, + int min_samples, double *out); -DLL_EXPORT int GroupedArray_StandardScalerStats(GroupedArrayHandle handle, - int data_type, void *out); +DLL_EXPORT int +GroupedArrayFloat32_ExpandingMeanTransform(GroupedArrayHandle handle, int lag, + float *out, float *agg); +DLL_EXPORT int +GroupedArrayFloat64_ExpandingMeanTransform(GroupedArrayHandle handle, int lag, + double *out, double *agg); -DLL_EXPORT int GroupedArray_RobustScalerIqrStats(GroupedArrayHandle handle, - int data_type, void *out); +DLL_EXPORT int +GroupedArrayFloat32_ExpandingStdTransform(GroupedArrayHandle handle, int lag, + float *out, float *agg); +DLL_EXPORT int +GroupedArrayFloat64_ExpandingStdTransform(GroupedArrayHandle handle, int lag, + double *out, double *agg); -DLL_EXPORT int GroupedArray_RobustScalerMadStats(GroupedArrayHandle handle, - int data_type, void *out); +DLL_EXPORT int +GroupedArrayFloat32_ExpandingMinTransform(GroupedArrayHandle handle, int lag, + float *out); +DLL_EXPORT int +GroupedArrayFloat64_ExpandingMinTransform(GroupedArrayHandle handle, int lag, + double *out); -DLL_EXPORT int GroupedArray_ScalerTransform(GroupedArrayHandle handle, - const void *stats, int data_type, - void *out); +DLL_EXPORT int +GroupedArrayFloat32_ExpandingMaxTransform(GroupedArrayHandle handle, int lag, + float *out); +DLL_EXPORT int +GroupedArrayFloat64_ExpandingMaxTransform(GroupedArrayHandle handle, int lag, + double *out); -DLL_EXPORT int GroupedArray_ScalerInverseTransform(GroupedArrayHandle handle, - const void *stats, - int data_type, void *out); +DLL_EXPORT int GroupedArrayFloat32_ExponentiallyWeightedMeanTransform( + GroupedArrayHandle handle, int lag, float alpha, float *out); +DLL_EXPORT int GroupedArrayFloat64_ExponentiallyWeightedMeanTransform( + GroupedArrayHandle handle, int lag, double alpha, double *out); } diff --git a/pyproject.toml b/pyproject.toml index 928db03..409ed84 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ version = "0.0.1" requires-python = ">=3.8" dependencies = [ "importlib_resources ; python_version < '3.10'", - "numpy", + "numpy>=1.20.0", ] license = {file = "LICENSE"} classifiers = [ @@ -43,5 +43,5 @@ wheel.packages = ["coreforecast"] wheel.py-api = "py3" [tool.cibuildwheel] -test-requires = "pytest" +test-requires = "pytest window-ops" test-command = "pytest {project}/tests -k correct" diff --git a/requirements-test.txt b/requirements-test.txt new file mode 100644 index 0000000..a6a9968 --- /dev/null +++ b/requirements-test.txt @@ -0,0 +1,7 @@ +importlib_resources ; python_version < "3.10" +numba +numpy +pytest +pytest-benchmark +utilsforecast>=0.0.7 +window-ops diff --git a/scripts/disable_omp_arm64.py b/scripts/disable_omp_arm64.py new file mode 100644 index 0000000..73aff09 --- /dev/null +++ b/scripts/disable_omp_arm64.py @@ -0,0 +1,11 @@ +import os + +import toml + + +if os.environ["CIBW_BUILD"] == "cp38-macosx_arm64": + with open("pyproject.toml", "rt") as f: + cfg = toml.load(f) + cfg["tool"]["scikit-build"]["cmake"]["args"] = ["-DUSE_OPENMP=OFF"] + with open("pyproject.toml", "wt") as f: + toml.dump(cfg, f) diff --git a/src/coreforecast.cpp b/src/coreforecast.cpp index 96745a7..9042b0a 100644 --- a/src/coreforecast.cpp +++ b/src/coreforecast.cpp @@ -14,14 +14,23 @@ inline T CommonScalerInverseTransform(T data, T scale, T offset) { return data * scale + offset; } -template inline int FirstNotNaN(const T *data, int n) { - int i = 0; +template inline indptr_t FirstNotNaN(const T *data, indptr_t n) { + indptr_t i = 0; while (std::isnan(data[i]) && i < n) { ++i; } return i; } +template +inline void TakeFromGroups(const T *data, int n, T *out, int k) { + if (k > n) { + *out = std::numeric_limits::quiet_NaN(); + } else { + *out = data[n - 1 - k]; + } +} + template inline void MinMaxScalerStats(const T *data, int n, T *stats) { T min = std::numeric_limits::infinity(); @@ -87,143 +96,866 @@ inline void RobustScalerMadStats(const T *data, int n, T *stats) { delete[] buffer; } +template inline void LagTransform(const T *data, int n, T *out) { + std::copy(data, data + n, out); +} + +template +inline void RollingMeanTransform(const T *data, int n, T *out, int window_size, + int min_samples) { + T accum = static_cast(0.0); + int upper_limit = std::min(window_size, n); + for (int i = 0; i < upper_limit; ++i) { + accum += data[i]; + if (i + 1 >= min_samples) + out[i] = accum / (i + 1); + } + + for (int i = window_size; i < n; ++i) { + accum += data[i] - data[i - window_size]; + out[i] = accum / window_size; + } +} + +template +inline void RollingStdTransformWithStats(const T *data, int n, T *out, T *agg, + bool save_stats, int window_size, + int min_samples) { + T prev_avg = static_cast(0.0); + T curr_avg = data[0]; + T m2 = static_cast(0.0); + int upper_limit = std::min(window_size, n); + for (int i = 0; i < upper_limit; ++i) { + prev_avg = curr_avg; + curr_avg = prev_avg + (data[i] - prev_avg) / (i + 1); + m2 += (data[i] - prev_avg) * (data[i] - curr_avg); + if (i + 1 >= min_samples) + out[i] = sqrt(m2 / i); + } + for (int i = window_size; i < n; ++i) { + T delta = data[i] - data[i - window_size]; + prev_avg = curr_avg; + curr_avg = prev_avg + delta / window_size; + m2 += delta * (data[i] - curr_avg + data[i - window_size] - prev_avg); + // avoid possible loss of precision + m2 = std::max(m2, static_cast(0.0)); + out[i] = sqrt(m2 / (window_size - 1)); + } + if (save_stats) { + agg[0] = static_cast(n); + agg[1] = curr_avg; + agg[2] = m2; + } +} + +template +inline void RollingStdTransform(const T *data, int n, T *out, int window_size, + int min_samples) { + T tmp; + RollingStdTransformWithStats(data, n, out, &tmp, false, window_size, + min_samples); +} + +template +inline void RollingCompTransform(Func Comp, const T *data, int n, T *out, + int window_size, int min_samples) { + int upper_limit = std::min(window_size, n); + T pivot = data[0]; + for (int i = 0; i < upper_limit; ++i) { + if (Comp(data[i], pivot)) { + pivot = data[i]; + } + if (i + 1 >= min_samples) { + out[i] = pivot; + } + } + for (int i = window_size; i < n; ++i) { + pivot = data[i]; + for (int j = 0; j < window_size; ++j) { + if (Comp(data[i - j], pivot)) { + pivot = data[i - j]; + } + } + out[i] = pivot; + } +} + +template struct RollingMinTransform { + void operator()(const T *data, int n, T *out, int window_size, + int min_samples) { + RollingCompTransform(std::less(), data, n, out, window_size, + min_samples); + } +}; + +template struct RollingMaxTransform { + void operator()(const T *data, int n, T *out, int window_size, + int min_samples) const { + RollingCompTransform(std::greater(), data, n, out, window_size, + min_samples); + } +}; + +template +inline void SeasonalRollingTransform(Func RollingTfm, const T *data, int n, + T *out, int season_length, int window_size, + int min_samples) { + int buff_size = n / season_length + (n % season_length > 0); + T *season_data = new T[buff_size]; + T *season_out = new T[buff_size]; + std::fill_n(season_out, buff_size, std::numeric_limits::quiet_NaN()); + for (int i = 0; i < season_length; ++i) { + int season_n = n / season_length + (i < n % season_length); + for (int j = 0; j < season_n; ++j) { + season_data[j] = data[i + j * season_length]; + } + RollingTfm(season_data, season_n, season_out, window_size, min_samples); + for (int j = 0; j < season_n; ++j) { + out[i + j * season_length] = season_out[j]; + } + } + delete[] season_data; + delete[] season_out; +} + +template struct SeasonalRollingMeanTransform { + void operator()(const T *data, int n, T *out, int season_length, + int window_size, int min_samples) { + SeasonalRollingTransform(RollingMeanTransform, data, n, out, + season_length, window_size, min_samples); + } +}; + +template struct SeasonalRollingStdTransform { + void operator()(const T *data, int n, T *out, int season_length, + int window_size, int min_samples) { + SeasonalRollingTransform(RollingStdTransform, data, n, out, + season_length, window_size, min_samples); + } +}; + +template struct SeasonalRollingMinTransform { + void operator()(const T *data, int n, T *out, int season_length, + int window_size, int min_samples) { + SeasonalRollingTransform(RollingMinTransform(), data, n, out, + season_length, window_size, min_samples); + } +}; + +template struct SeasonalRollingMaxTransform { + void operator()(const T *data, int n, T *out, int season_length, + int window_size, int min_samples) { + SeasonalRollingTransform(RollingMaxTransform(), data, n, out, + season_length, window_size, min_samples); + } +}; + +template +inline void RollingUpdate(Func RollingTfm, const T *data, int n, T *out, + int window_size, int min_samples) { + if (n < min_samples) { + *out = std::numeric_limits::quiet_NaN(); + return; + } + int n_samples = std::min(window_size, n); + T *buffer = new T[n_samples]; + RollingTfm(data + n - n_samples, n_samples, buffer, window_size, min_samples); + *out = buffer[n_samples - 1]; + delete[] buffer; +} + +template struct RollingMeanUpdate { + void operator()(const T *data, int n, T *out, int window_size, + int min_samples) { + RollingUpdate(RollingMeanTransform, data, n, out, window_size, + min_samples); + } +}; + +template struct RollingStdUpdate { + void operator()(const T *data, int n, T *out, int window_size, + int min_samples) { + RollingUpdate(RollingStdTransform, data, n, out, window_size, + min_samples); + } +}; + +template struct RollingMinUpdate { + void operator()(const T *data, int n, T *out, int window_size, + int min_samples) { + RollingUpdate(RollingMinTransform(), data, n, out, window_size, + min_samples); + } +}; + +template struct RollingMaxUpdate { + void operator()(const T *data, int n, T *out, int window_size, + int min_samples) { + RollingUpdate(RollingMaxTransform(), data, n, out, window_size, + min_samples); + } +}; + +template +inline void SeasonalRollingUpdate(Func RollingUpdate, const T *data, int n, + T *out, int season_length, int window_size, + int min_samples) { + int season = n % season_length; + int season_n = n / season_length + (season > 0); + if (season_n < min_samples) { + *out = std::numeric_limits::quiet_NaN(); + return; + } + int n_samples = std::min(window_size, season_n); + T *season_data = new T[n_samples]; + for (int i = 0; i < n_samples; ++i) { + season_data[i] = data[n - 1 - (n_samples - 1 - i) * season_length]; + } + RollingUpdate(season_data, n_samples, out, window_size, min_samples); + delete[] season_data; +} + +template struct SeasonalRollingMeanUpdate { + void operator()(const T *data, int n, T *out, int season_length, + int window_size, int min_samples) { + SeasonalRollingUpdate(RollingMeanUpdate(), data, n, out, season_length, + window_size, min_samples); + } +}; + +template struct SeasonalRollingStdUpdate { + void operator()(const T *data, int n, T *out, int season_length, + int window_size, int min_samples) { + SeasonalRollingUpdate(RollingStdUpdate(), data, n, out, season_length, + window_size, min_samples); + } +}; + +template struct SeasonalRollingMinUpdate { + void operator()(const T *data, int n, T *out, int season_length, + int window_size, int min_samples) { + SeasonalRollingUpdate(RollingMinUpdate(), data, n, out, season_length, + window_size, min_samples); + } +}; + +template struct SeasonalRollingMaxUpdate { + void operator()(const T *data, int n, T *out, int season_length, + int window_size, int min_samples) { + SeasonalRollingUpdate(RollingMaxUpdate(), data, n, out, season_length, + window_size, min_samples); + } +}; + +template +inline void ExpandingMeanTransform(const T *data, int n, T *out, T *agg) { + T accum = static_cast(0.0); + for (int i = 0; i < n; ++i) { + accum += data[i]; + out[i] = accum / (i + 1); + } + *agg = static_cast(n); +} + +template +inline void ExpandingStdTransform(const T *data, int n, T *out, T *agg) { + RollingStdTransformWithStats(data, n, out, agg, true, n, 2); +} + +template struct ExpandingMinTransform { + void operator()(const T *data, int n, T *out) { + RollingCompTransform(std::less(), data, n, out, n, 1); + } +}; + +template struct ExpandingMaxTransform { + void operator()(const T *data, int n, T *out) { + RollingCompTransform(std::greater(), data, n, out, n, 1); + } +}; + +template +inline void ExponentiallyWeightedMeanTransform(const T *data, int n, T *out, + T alpha) { + out[0] = data[0]; + for (int i = 1; i < n; ++i) { + out[i] = alpha * data[i] + (1 - alpha) * out[i - 1]; + } +} + template class GroupedArray { private: const T *data_; - int32_t n_data_; - int32_t *indptr_; - int32_t n_groups_; + indptr_t n_data_; + const indptr_t *indptr_; + int n_groups_; + int num_threads_; public: - GroupedArray(const T *data, int32_t n_data, int32_t *indptr, int32_t n_indptr) - : data_(data), n_data_(n_data), indptr_(indptr), n_groups_(n_indptr - 1) { - } + GroupedArray(const T *data, indptr_t n_data, const indptr_t *indptr, + int n_indptr, int num_threads) + : data_(data), n_data_(n_data), indptr_(indptr), n_groups_(n_indptr - 1), + num_threads_(num_threads) {} ~GroupedArray() {} - template void ComputeStats(Func f, T *out) const { + template + void Reduce(Func f, int n_out, T *out, int lag, + Args &&...args) const noexcept { +#pragma omp parallel for schedule(static) num_threads(num_threads_) for (int i = 0; i < n_groups_; ++i) { - int32_t start = indptr_[i]; - int32_t end = indptr_[i + 1]; - int32_t n = end - start; - int start_idx = FirstNotNaN(data_ + start, n); - if (start_idx == n) + indptr_t start = indptr_[i]; + indptr_t end = indptr_[i + 1]; + indptr_t n = end - start; + indptr_t start_idx = FirstNotNaN(data_ + start, n); + if (start_idx + lag >= n) continue; - f(data_ + start + start_idx, n - start_idx, out + 2 * i); + f(data_ + start + start_idx, n - start_idx - lag, out + n_out * i, + std::forward(args)...); } } template - void ScalerTransform(Func f, const T *stats, T *out) const { + void ScalerTransform(Func f, const T *stats, T *out) const noexcept { +#pragma omp parallel for schedule(static) num_threads(num_threads_) for (int i = 0; i < n_groups_; ++i) { - int32_t start = indptr_[i]; - int32_t end = indptr_[i + 1]; + indptr_t start = indptr_[i]; + indptr_t end = indptr_[i + 1]; T offset = stats[2 * i]; T scale = stats[2 * i + 1]; - for (int32_t j = start; j < end; ++j) { + if (std::abs(scale) < std::numeric_limits::epsilon()) { + scale = static_cast(1.0); + } + for (indptr_t j = start; j < end; ++j) { out[j] = f(data_[j], scale, offset); } } } -}; -int GroupedArray_CreateFromArrays(const void *data, int32_t n_data, - int32_t *indptr, int32_t n_groups, - int data_type, GroupedArrayHandle *out) { - if (data_type == DTYPE_FLOAT32) { - *out = new GroupedArray(static_cast(data), n_data, - indptr, n_groups); - } else { - *out = new GroupedArray(static_cast(data), n_data, - indptr, n_groups); + template + void Transform(Func f, int lag, T *out, Args &&...args) const noexcept { +#pragma omp parallel for schedule(static) num_threads(num_threads_) + for (int i = 0; i < n_groups_; ++i) { + indptr_t start = indptr_[i]; + indptr_t end = indptr_[i + 1]; + indptr_t n = end - start; + indptr_t start_idx = FirstNotNaN(data_ + start, n); + if (start_idx + lag >= n) { + continue; + } + start += start_idx; + f(data_ + start, n - start_idx - lag, out + start + lag, + std::forward(args)...); + } } + + template + void TransformAndReduce(Func f, int lag, T *out, int n_agg, T *agg, + Args &&...args) const noexcept { +#pragma omp parallel for schedule(static) num_threads(num_threads_) + for (int i = 0; i < n_groups_; ++i) { + indptr_t start = indptr_[i]; + indptr_t end = indptr_[i + 1]; + indptr_t n = end - start; + indptr_t start_idx = FirstNotNaN(data_ + start, n); + if (start_idx + lag >= n) { + continue; + } + start += start_idx; + f(data_ + start, n - start_idx - lag, out + start + lag, agg + i * n_agg, + std::forward(args)...); + } + } +}; + +int GroupedArrayFloat32_Create(const float *data, indptr_t n_data, + indptr_t *indptr, indptr_t n_indptr, + int num_threads, GroupedArrayHandle *out) { + *out = new GroupedArray(data, n_data, indptr, n_indptr, num_threads); + return 0; +} +int GroupedArrayFloat64_Create(const double *data, indptr_t n_data, + indptr_t *indptr, indptr_t n_indptr, + int num_threads, GroupedArrayHandle *out) { + *out = new GroupedArray(data, n_data, indptr, n_indptr, num_threads); return 0; } -int GroupedArray_Delete(GroupedArrayHandle handle, int data_type) { - if (data_type == DTYPE_FLOAT32) { - delete reinterpret_cast *>(handle); - } else { - delete reinterpret_cast *>(handle); - } +int GroupedArrayFloat32_Delete(GroupedArrayHandle handle) { + delete reinterpret_cast *>(handle); + return 0; +} +int GroupedArrayFloat64_Delete(GroupedArrayHandle handle) { + delete reinterpret_cast *>(handle); return 0; } -int GroupedArray_MinMaxScalerStats(GroupedArrayHandle handle, int data_type, - void *out) { - if (data_type == DTYPE_FLOAT32) { - auto ga = reinterpret_cast *>(handle); - ga->ComputeStats(MinMaxScalerStats, static_cast(out)); - } else { - auto ga = reinterpret_cast *>(handle); - ga->ComputeStats(MinMaxScalerStats, static_cast(out)); - } +int GroupedArrayFloat32_MinMaxScalerStats(GroupedArrayHandle handle, + float *out) { + auto ga = reinterpret_cast *>(handle); + ga->Reduce(MinMaxScalerStats, 2, out, 0); + return 0; +} +int GroupedArrayFloat64_MinMaxScalerStats(GroupedArrayHandle handle, + double *out) { + auto ga = reinterpret_cast *>(handle); + ga->Reduce(MinMaxScalerStats, 2, out, 0); return 0; } -int GroupedArray_StandardScalerStats(GroupedArrayHandle handle, int data_type, - void *out) { - if (data_type == DTYPE_FLOAT32) { - auto ga = reinterpret_cast *>(handle); - ga->ComputeStats(StandardScalerStats, static_cast(out)); - } else { - auto ga = reinterpret_cast *>(handle); - ga->ComputeStats(StandardScalerStats, static_cast(out)); - } +int GroupedArrayFloat32_StandardScalerStats(GroupedArrayHandle handle, + float *out) { + auto ga = reinterpret_cast *>(handle); + ga->Reduce(StandardScalerStats, 2, out, 0); + return 0; +} +int GroupedArrayFloat64_StandardScalerStats(GroupedArrayHandle handle, + double *out) { + auto ga = reinterpret_cast *>(handle); + ga->Reduce(StandardScalerStats, 2, out, 0); return 0; } -int GroupedArray_RobustScalerIqrStats(GroupedArrayHandle handle, int data_type, - void *out) { - if (data_type == DTYPE_FLOAT32) { - auto ga = reinterpret_cast *>(handle); - ga->ComputeStats(RobustScalerIqrStats, static_cast(out)); - } else { - auto ga = reinterpret_cast *>(handle); - ga->ComputeStats(RobustScalerIqrStats, static_cast(out)); - } +int GroupedArrayFloat32_RobustIqrScalerStats(GroupedArrayHandle handle, + float *out) { + auto ga = reinterpret_cast *>(handle); + ga->Reduce(RobustScalerIqrStats, 2, out, 0); + return 0; +} +int GroupedArrayFloat64_RobustIqrScalerStats(GroupedArrayHandle handle, + double *out) { + auto ga = reinterpret_cast *>(handle); + ga->Reduce(RobustScalerIqrStats, 2, out, 0); return 0; } -int GroupedArray_RobustScalerMadStats(GroupedArrayHandle handle, int data_type, - void *out) { - if (data_type == DTYPE_FLOAT32) { - auto ga = reinterpret_cast *>(handle); - ga->ComputeStats(RobustScalerMadStats, static_cast(out)); - } else { - auto ga = reinterpret_cast *>(handle); - ga->ComputeStats(RobustScalerMadStats, static_cast(out)); - } +int GroupedArrayFloat32_RobustMadScalerStats(GroupedArrayHandle handle, + float *out) { + auto ga = reinterpret_cast *>(handle); + ga->Reduce(RobustScalerMadStats, 2, out, 0); + return 0; +} +int GroupedArrayFloat64_RobustMadScalerStats(GroupedArrayHandle handle, + double *out) { + auto ga = reinterpret_cast *>(handle); + ga->Reduce(RobustScalerMadStats, 2, out, 0); return 0; } -int GroupedArray_ScalerTransform(GroupedArrayHandle handle, const void *stats, - int data_type, void *out) { - if (data_type == DTYPE_FLOAT32) { - auto ga = reinterpret_cast *>(handle); - ga->ScalerTransform(CommonScalerTransform, - static_cast(stats), - static_cast(out)); - } else { - auto ga = reinterpret_cast *>(handle); - ga->ScalerTransform(CommonScalerTransform, - static_cast(stats), - static_cast(out)); - } +int GroupedArrayFloat32_ScalerTransform(GroupedArrayHandle handle, + const float *stats, float *out) { + auto ga = reinterpret_cast *>(handle); + ga->ScalerTransform(CommonScalerTransform, stats, out); + return 0; +} +int GroupedArrayFloat64_ScalerTransform(GroupedArrayHandle handle, + const double *stats, double *out) { + auto ga = reinterpret_cast *>(handle); + ga->ScalerTransform(CommonScalerTransform, stats, out); return 0; } -int GroupedArray_ScalerInverseTransform(GroupedArrayHandle handle, - const void *stats, int data_type, - void *out) { - if (data_type == DTYPE_FLOAT32) { - auto ga = reinterpret_cast *>(handle); - ga->ScalerTransform(CommonScalerInverseTransform, - static_cast(stats), - static_cast(out)); - } else { - auto ga = reinterpret_cast *>(handle); - ga->ScalerTransform(CommonScalerInverseTransform, - static_cast(stats), - static_cast(out)); - } +int GroupedArrayFloat32_ScalerInverseTransform(GroupedArrayHandle handle, + const float *stats, float *out) { + auto ga = reinterpret_cast *>(handle); + ga->ScalerTransform(CommonScalerInverseTransform, stats, out); + return 0; +} +int GroupedArrayFloat64_ScalerInverseTransform(GroupedArrayHandle handle, + const double *stats, + double *out) { + auto ga = reinterpret_cast *>(handle); + ga->ScalerTransform(CommonScalerInverseTransform, stats, out); + return 0; +} + +int GroupedArrayFloat32_TakeFromGroups(GroupedArrayHandle handle, int k, + float *out) { + auto ga = reinterpret_cast *>(handle); + ga->Reduce(TakeFromGroups, 1, out, 0, k); + return 0; +} +int GroupedArrayFloat64_TakeFromGroups(GroupedArrayHandle handle, int k, + double *out) { + auto ga = reinterpret_cast *>(handle); + ga->Reduce(TakeFromGroups, 1, out, 0, k); + return 0; +} + +int GroupedArrayFloat32_LagTransform(GroupedArrayHandle handle, int lag, + float *out) { + auto ga = reinterpret_cast *>(handle); + ga->Transform(LagTransform, lag, out); + return 0; +} +int GroupedArrayFloat64_LagTransform(GroupedArrayHandle handle, int lag, + double *out) { + auto ga = reinterpret_cast *>(handle); + ga->Transform(LagTransform, lag, out); + return 0; +} + +int GroupedArrayFloat32_RollingMeanTransform(GroupedArrayHandle handle, int lag, + int window_size, int min_samples, + float *out) { + auto ga = reinterpret_cast *>(handle); + ga->Transform(RollingMeanTransform, lag, out, window_size, + min_samples); + return 0; +} +int GroupedArrayFloat64_RollingMeanTransform(GroupedArrayHandle handle, int lag, + int window_size, int min_samples, + double *out) { + auto ga = reinterpret_cast *>(handle); + ga->Transform(RollingMeanTransform, lag, out, window_size, + min_samples); + return 0; +} + +int GroupedArrayFloat32_RollingStdTransform(GroupedArrayHandle handle, int lag, + int window_size, int min_samples, + float *out) { + auto ga = reinterpret_cast *>(handle); + ga->Transform(RollingStdTransform, lag, out, window_size, min_samples); + return 0; +} +int GroupedArrayFloat64_RollingStdTransform(GroupedArrayHandle handle, int lag, + int window_size, int min_samples, + double *out) { + auto ga = reinterpret_cast *>(handle); + ga->Transform(RollingStdTransform, lag, out, window_size, + min_samples); + return 0; +} + +int GroupedArrayFloat32_RollingMinTransform(GroupedArrayHandle handle, int lag, + int window_size, int min_samples, + float *out) { + auto ga = reinterpret_cast *>(handle); + ga->Transform(RollingMinTransform(), lag, out, window_size, + min_samples); + return 0; +} +int GroupedArrayFloat64_RollingMinTransform(GroupedArrayHandle handle, int lag, + int window_size, int min_samples, + double *out) { + auto ga = reinterpret_cast *>(handle); + ga->Transform(RollingMinTransform(), lag, out, window_size, + min_samples); + return 0; +} + +int GroupedArrayFloat32_RollingMaxTransform(GroupedArrayHandle handle, int lag, + int window_size, int min_samples, + float *out) { + auto ga = reinterpret_cast *>(handle); + ga->Transform(RollingMaxTransform(), lag, out, window_size, + min_samples); + return 0; +} +int GroupedArrayFloat64_RollingMaxTransform(GroupedArrayHandle handle, int lag, + int window_size, int min_samples, + double *out) { + auto ga = reinterpret_cast *>(handle); + ga->Transform(RollingMaxTransform(), lag, out, window_size, + min_samples); + return 0; +} + +int GroupedArrayFloat32_RollingMeanUpdate(GroupedArrayHandle handle, int lag, + int window_size, int min_samples, + float *out) { + auto ga = reinterpret_cast *>(handle); + ga->Reduce(RollingMeanUpdate(), 1, out, lag, window_size, min_samples); + return 0; +} +int GroupedArrayFloat64_RollingMeanUpdate(GroupedArrayHandle handle, int lag, + int window_size, int min_samples, + double *out) { + auto ga = reinterpret_cast *>(handle); + ga->Reduce(RollingMeanUpdate(), 1, out, lag, window_size, + min_samples); + return 0; +} + +int GroupedArrayFloat32_RollingStdUpdate(GroupedArrayHandle handle, int lag, + int window_size, int min_samples, + float *out) { + auto ga = reinterpret_cast *>(handle); + ga->Reduce(RollingStdUpdate(), 1, out, lag, window_size, min_samples); + return 0; +} +int GroupedArrayFloat64_RollingStdUpdate(GroupedArrayHandle handle, int lag, + int window_size, int min_samples, + double *out) { + auto ga = reinterpret_cast *>(handle); + ga->Reduce(RollingStdUpdate(), 1, out, lag, window_size, min_samples); + return 0; +} + +int GroupedArrayFloat32_RollingMinUpdate(GroupedArrayHandle handle, int lag, + int window_size, int min_samples, + float *out) { + auto ga = reinterpret_cast *>(handle); + ga->Reduce(RollingMinUpdate(), 1, out, lag, window_size, min_samples); + return 0; +} +int GroupedArrayFloat64_RollingMinUpdate(GroupedArrayHandle handle, int lag, + int window_size, int min_samples, + double *out) { + auto ga = reinterpret_cast *>(handle); + ga->Reduce(RollingMinUpdate(), 1, out, lag, window_size, min_samples); + return 0; +} + +int GroupedArrayFloat32_RollingMaxUpdate(GroupedArrayHandle handle, int lag, + int window_size, int min_samples, + float *out) { + auto ga = reinterpret_cast *>(handle); + ga->Reduce(RollingMaxUpdate(), 1, out, lag, window_size, min_samples); + return 0; +} +int GroupedArrayFloat64_RollingMaxUpdate(GroupedArrayHandle handle, int lag, + int window_size, int min_samples, + double *out) { + auto ga = reinterpret_cast *>(handle); + ga->Reduce(RollingMaxUpdate(), 1, out, lag, window_size, min_samples); + return 0; +} + +int GroupedArrayFloat32_SeasonalRollingMeanTransform(GroupedArrayHandle handle, + int lag, int season_length, + int window_size, + int min_samples, + float *out) { + auto ga = reinterpret_cast *>(handle); + ga->Transform(SeasonalRollingMeanTransform(), lag, out, season_length, + window_size, min_samples); + return 0; +} +int GroupedArrayFloat64_SeasonalRollingMeanTransform(GroupedArrayHandle handle, + int lag, int season_length, + int window_size, + int min_samples, + double *out) { + auto ga = reinterpret_cast *>(handle); + ga->Transform(SeasonalRollingMeanTransform(), lag, out, season_length, + window_size, min_samples); + return 0; +} + +int GroupedArrayFloat32_SeasonalRollingStdTransform(GroupedArrayHandle handle, + int lag, int season_length, + int window_size, + int min_samples, + float *out) { + auto ga = reinterpret_cast *>(handle); + ga->Transform(SeasonalRollingStdTransform(), lag, out, season_length, + window_size, min_samples); + return 0; +} +int GroupedArrayFloat64_SeasonalRollingStdTransform(GroupedArrayHandle handle, + int lag, int season_length, + int window_size, + int min_samples, + double *out) { + auto ga = reinterpret_cast *>(handle); + ga->Transform(SeasonalRollingStdTransform(), lag, out, season_length, + window_size, min_samples); + return 0; +} + +int GroupedArrayFloat32_SeasonalRollingMinTransform(GroupedArrayHandle handle, + int lag, int season_length, + int window_size, + int min_samples, + float *out) { + auto ga = reinterpret_cast *>(handle); + ga->Transform(SeasonalRollingMinTransform(), lag, out, season_length, + window_size, min_samples); + return 0; +} +int GroupedArrayFloat64_SeasonalRollingMinTransform(GroupedArrayHandle handle, + int lag, int season_length, + int window_size, + int min_samples, + double *out) { + auto ga = reinterpret_cast *>(handle); + ga->Transform(SeasonalRollingMinTransform(), lag, out, season_length, + window_size, min_samples); + return 0; +} + +int GroupedArrayFloat32_SeasonalRollingMaxTransform(GroupedArrayHandle handle, + int lag, int season_length, + int window_size, + int min_samples, + float *out) { + auto ga = reinterpret_cast *>(handle); + ga->Transform(SeasonalRollingMaxTransform(), lag, out, season_length, + window_size, min_samples); + return 0; +} +int GroupedArrayFloat64_SeasonalRollingMaxTransform(GroupedArrayHandle handle, + int lag, int season_length, + int window_size, + int min_samples, + double *out) { + auto ga = reinterpret_cast *>(handle); + ga->Transform(SeasonalRollingMaxTransform(), lag, out, season_length, + window_size, min_samples); + return 0; +} + +int GroupedArrayFloat32_SeasonalRollingMeanUpdate(GroupedArrayHandle handle, + int lag, int season_length, + int window_size, + int min_samples, float *out) { + + auto ga = reinterpret_cast *>(handle); + ga->Reduce(SeasonalRollingMeanUpdate(), 1, out, lag, season_length, + window_size, min_samples); + return 0; +} +int GroupedArrayFloat64_SeasonalRollingMeanUpdate(GroupedArrayHandle handle, + int lag, int season_length, + int window_size, + int min_samples, + double *out) { + + auto ga = reinterpret_cast *>(handle); + ga->Reduce(SeasonalRollingMeanUpdate(), 1, out, lag, season_length, + window_size, min_samples); + return 0; +} + +int GroupedArrayFloat32_SeasonalRollingStdUpdate(GroupedArrayHandle handle, + int lag, int season_length, + int window_size, + int min_samples, float *out) { + + auto ga = reinterpret_cast *>(handle); + ga->Reduce(SeasonalRollingStdUpdate(), 1, out, lag, season_length, + window_size, min_samples); + return 0; +} +int GroupedArrayFloat64_SeasonalRollingStdUpdate(GroupedArrayHandle handle, + int lag, int season_length, + int window_size, + int min_samples, double *out) { + + auto ga = reinterpret_cast *>(handle); + ga->Reduce(SeasonalRollingStdUpdate(), 1, out, lag, season_length, + window_size, min_samples); + return 0; +} + +int GroupedArrayFloat32_SeasonalRollingMinUpdate(GroupedArrayHandle handle, + int lag, int season_length, + int window_size, + int min_samples, float *out) { + + auto ga = reinterpret_cast *>(handle); + ga->Reduce(SeasonalRollingMinUpdate(), 1, out, lag, season_length, + window_size, min_samples); + return 0; +} +int GroupedArrayFloat64_SeasonalRollingMinUpdate(GroupedArrayHandle handle, + int lag, int season_length, + int window_size, + int min_samples, double *out) { + + auto ga = reinterpret_cast *>(handle); + ga->Reduce(SeasonalRollingMinUpdate(), 1, out, lag, season_length, + window_size, min_samples); + return 0; +} + +int GroupedArrayFloat32_SeasonalRollingMaxUpdate(GroupedArrayHandle handle, + int lag, int season_length, + int window_size, + int min_samples, float *out) { + + auto ga = reinterpret_cast *>(handle); + ga->Reduce(SeasonalRollingMaxUpdate(), 1, out, lag, season_length, + window_size, min_samples); + return 0; +} +int GroupedArrayFloat64_SeasonalRollingMaxUpdate(GroupedArrayHandle handle, + int lag, int season_length, + int window_size, + int min_samples, double *out) { + + auto ga = reinterpret_cast *>(handle); + ga->Reduce(SeasonalRollingMaxUpdate(), 1, out, lag, season_length, + window_size, min_samples); + return 0; +} + +int GroupedArrayFloat32_ExpandingMeanTransform(GroupedArrayHandle handle, + int lag, float *out, + float *agg) { + auto ga = reinterpret_cast *>(handle); + ga->TransformAndReduce(ExpandingMeanTransform, lag, out, 1, agg); + return 0; +} +int GroupedArrayFloat64_ExpandingMeanTransform(GroupedArrayHandle handle, + int lag, double *out, + double *agg) { + auto ga = reinterpret_cast *>(handle); + ga->TransformAndReduce(ExpandingMeanTransform, lag, out, 1, agg); + return 0; +} + +int GroupedArrayFloat32_ExpandingStdTransform(GroupedArrayHandle handle, + int lag, float *out, float *agg) { + auto ga = reinterpret_cast *>(handle); + ga->TransformAndReduce(ExpandingStdTransform, lag, out, 3, agg); + return 0; +} +int GroupedArrayFloat64_ExpandingStdTransform(GroupedArrayHandle handle, + int lag, double *out, + double *agg) { + auto ga = reinterpret_cast *>(handle); + ga->TransformAndReduce(ExpandingStdTransform, lag, out, 3, agg); + return 0; +} + +int GroupedArrayFloat32_ExpandingMinTransform(GroupedArrayHandle handle, + int lag, float *out) { + auto ga = reinterpret_cast *>(handle); + ga->Transform(ExpandingMinTransform(), lag, out); + return 0; +} +int GroupedArrayFloat64_ExpandingMinTransform(GroupedArrayHandle handle, + int lag, double *out) { + auto ga = reinterpret_cast *>(handle); + ga->Transform(ExpandingMinTransform(), lag, out); + return 0; +} + +int GroupedArrayFloat32_ExpandingMaxTransform(GroupedArrayHandle handle, + int lag, float *out) { + auto ga = reinterpret_cast *>(handle); + ga->Transform(ExpandingMaxTransform(), lag, out); + + return 0; +} +int GroupedArrayFloat64_ExpandingMaxTransform(GroupedArrayHandle handle, + int lag, double *out) { + auto ga = reinterpret_cast *>(handle); + ga->Transform(ExpandingMaxTransform(), lag, out); + + return 0; +} + +int GroupedArrayFloat32_ExponentiallyWeightedMeanTransform( + GroupedArrayHandle handle, int lag, float alpha, float *out) { + auto ga = reinterpret_cast *>(handle); + ga->Transform(ExponentiallyWeightedMeanTransform, lag, out, alpha); + return 0; +} +int GroupedArrayFloat64_ExponentiallyWeightedMeanTransform( + GroupedArrayHandle handle, int lag, double alpha, double *out) { + auto ga = reinterpret_cast *>(handle); + ga->Transform(ExponentiallyWeightedMeanTransform, lag, out, alpha); return 0; } diff --git a/tests/test_lag_transforms.py b/tests/test_lag_transforms.py new file mode 100644 index 0000000..4b246e6 --- /dev/null +++ b/tests/test_lag_transforms.py @@ -0,0 +1,93 @@ +import numpy as np +import pytest +from window_ops.expanding import * +from window_ops.ewm import ewm_mean +from window_ops.rolling import * +from window_ops.shift import shift_array + +from coreforecast.grouped_array import GroupedArray +from coreforecast.lag_transforms import * + +lag = 2 +season_length = 7 +window_size = 4 +min_samples = 2 +lengths = np.random.randint(low=100, high=200, size=100) +indptr = np.append(0, lengths.cumsum()).astype(np.int32) +data = 10 * np.random.rand(indptr[-1]) + + +def transform(data, indptr, updates_only, lag, func, *args) -> np.ndarray: + n_series = len(indptr) - 1 + if updates_only: + out = np.empty_like(data[:n_series]) + for i in range(n_series): + lagged = shift_array(data[indptr[i] : indptr[i + 1]], lag) + out[i] = func(lagged, *args)[-1] + else: + out = np.empty_like(data) + for i in range(n_series): + lagged = shift_array(data[indptr[i] : indptr[i + 1]], lag) + out[indptr[i] : indptr[i + 1]] = func(lagged, *args) + return out + + +@pytest.fixture +def data(): + return 10 * np.random.rand(indptr[-1]) + + +combs_map = { + "rolling_mean": (rolling_mean, RollingMean, [window_size, min_samples]), + "rolling_std": (rolling_std, RollingStd, [window_size, min_samples]), + "rolling_min": (rolling_min, RollingMin, [window_size, min_samples]), + "rolling_max": (rolling_max, RollingMax, [window_size, min_samples]), + "seasonal_rolling_mean": ( + seasonal_rolling_mean, + SeasonalRollingMean, + [season_length, window_size, min_samples], + ), + "seasonal_rolling_std": ( + seasonal_rolling_std, + SeasonalRollingStd, + [season_length, window_size, min_samples], + ), + "seasonal_rolling_min": ( + seasonal_rolling_min, + SeasonalRollingMin, + [season_length, window_size, min_samples], + ), + "seasonal_rolling_max": ( + seasonal_rolling_max, + SeasonalRollingMax, + [season_length, window_size, min_samples], + ), + "expanding_mean": (expanding_mean, ExpandingMean, []), + "expanding_std": (expanding_std, ExpandingStd, []), + "expanding_min": (expanding_min, ExpandingMin, []), + "expanding_max": (expanding_max, ExpandingMax, []), + "ewm_mean": (ewm_mean, ExponentiallyWeightedMean, [0.8]), +} + + +@pytest.mark.parametrize("comb", list(combs_map.keys())) +@pytest.mark.parametrize("dtype", [np.float32, np.float64]) +def test_correctness(data, comb, dtype): + rtol = 1e-5 if dtype == np.float32 else 1e-7 + if dtype == np.float32: + if "rolling_std" in comb: + rtol = 1e-2 + elif "expanding_std" in comb: + rtol = 5e-5 + data = data.astype(dtype, copy=True) + ga = GroupedArray(data, indptr) + wf, cf, args = combs_map[comb] + # transform + wres = transform(data, indptr, False, lag, wf, *args) + cobj = cf(lag, *args) + cres = cobj.transform(ga) + np.testing.assert_allclose(wres, cres, rtol=rtol) + # update + wres = transform(data, indptr, True, lag - 1, wf, *args) + cres = cobj.update(ga) + np.testing.assert_allclose(wres, cres, rtol=rtol) diff --git a/tests/test_scalers.py b/tests/test_scalers.py index 8f5a842..7c88fc2 100644 --- a/tests/test_scalers.py +++ b/tests/test_scalers.py @@ -63,10 +63,11 @@ def scaler_inverse_transform(x, stats): "robust-mad": LocalRobustScaler("mad"), } scalers = list(scaler2fns.keys()) +dtypes = [np.float32, np.float64] @pytest.mark.parametrize("scaler_name", scalers) -@pytest.mark.parametrize("dtype", [np.float32, np.float64]) +@pytest.mark.parametrize("dtype", dtypes) def test_correctness(data, indptr, scaler_name, dtype): # introduce some nans at the starts of groups data = data.astype(dtype, copy=True) @@ -128,4 +129,15 @@ def test_performance(benchmark, data, indptr, scaler_name, dtype, lib): scaler = scaler2core[scaler_name] else: scaler = scaler2utils[scaler_name] - benchmark(lambda: scaler.fit(ga)) + benchmark(scaler.fit, ga) + + +@pytest.mark.parametrize("scaler_name", scalers) +@pytest.mark.parametrize("dtype", dtypes) +@pytest.mark.parametrize("num_threads", [1, 2]) +def test_multithreaded_performance( + benchmark, data, indptr, scaler_name, dtype, num_threads +): + ga = GroupedArray(data.astype(dtype), indptr, num_threads=num_threads) + scaler = scaler2core[scaler_name] + benchmark(scaler.fit, ga)