From 2172b86054aaf0d06ae07f214bc4fbde50a0a210 Mon Sep 17 00:00:00 2001 From: sronilsson Date: Thu, 2 Nov 2023 00:35:29 +0000 Subject: [PATCH] cleaned --- simba/mixins/timeseries_features_mixin.py | 498 +++++++++++++--------- 1 file changed, 302 insertions(+), 196 deletions(-) diff --git a/simba/mixins/timeseries_features_mixin.py b/simba/mixins/timeseries_features_mixin.py index 2bdb5be59..790cdafd6 100644 --- a/simba/mixins/timeseries_features_mixin.py +++ b/simba/mixins/timeseries_features_mixin.py @@ -1,13 +1,15 @@ __author__ = "Simon Nilsson" -from numba import njit, prange, types, typed, jit -from numba.typed import List, Dict import numpy as np +from numba import jit, njit, prange, typed, types +from numba.typed import Dict, List + try: from typing import Literal except: from typing_extensions import Literal + class TimeseriesFeatureMixin(object): """ @@ -32,7 +34,7 @@ def __init__(self): pass @staticmethod - @njit('(float32[:],)') + @njit("(float32[:],)") def hjort_parameters(data: np.ndarray): """ Jitted compute of Hjorth parameters for a given time series data. Hjorth parameters describe @@ -73,10 +75,10 @@ def diff(x): return activity, mobility, complexity @staticmethod - @njit('(float32[:], float64[:], int64)') - def sliding_hjort_parameters(data: np.ndarray, - window_sizes: np.ndarray, - sample_rate: int) -> np.ndarray: + @njit("(float32[:], float64[:], int64)") + def sliding_hjort_parameters( + data: np.ndarray, window_sizes: np.ndarray, sample_rate: int + ) -> np.ndarray: """ Jitted compute of Hjorth parameters, including mobility, complexity, and activity, for sliding windows of varying sizes applied to the input data array. @@ -93,7 +95,9 @@ def sliding_hjort_parameters(data: np.ndarray, results = np.full((3, data.shape[0], window_sizes.shape[0]), -1.0) for i in range(window_sizes.shape[0]): window_size = int(window_sizes[i] * sample_rate) - for l, r in zip(prange(0, data.shape[0] + 1), prange(window_size, data.shape[0] + 1)): + for l, r in zip( + prange(0, data.shape[0] + 1), prange(window_size, data.shape[0] + 1) + ): sample = data[l:r] dx = sample[1:] - sample[:-1] ddx = dx[1:] - dx[:-1] @@ -109,7 +113,7 @@ def sliding_hjort_parameters(data: np.ndarray, return results.astype(np.float32) @staticmethod - @njit('(float32[:], boolean)') + @njit("(float32[:], boolean)") def local_maxima_minima(data: np.ndarray, maxima: bool) -> np.ndarray: """ Jitted compute of the local maxima or minima defined as values which are higher or lower than immediately preceding and proceeding time-series neighbors, repectively. @@ -154,7 +158,7 @@ def local_maxima_minima(data: np.ndarray, maxima: bool) -> np.ndarray: return results[np.argwhere(results[:, 0].T != -1).flatten()] @staticmethod - @njit('(float32[:], float64)') + @njit("(float32[:], float64)") def crossings(data: np.ndarray, val: float) -> int: """ Jitted compute of the count in time-series where sequential values crosses a defined value. @@ -187,11 +191,10 @@ def crossings(data: np.ndarray, val: float) -> int: return cnt @staticmethod - @njit('(float32[:], float64, float64[:], int64,)') - def sliding_crossings(data: np.ndarray, - val: float, - window_sizes: np.ndarray, - sample_rate: int) -> np.ndarray: + @njit("(float32[:], float64, float64[:], int64,)") + def sliding_crossings( + data: np.ndarray, val: float, window_sizes: np.ndarray, sample_rate: int + ) -> np.ndarray: """ Compute the number of crossings over sliding windows in a data array. @@ -209,7 +212,9 @@ def sliding_crossings(data: np.ndarray, results = np.full((data.shape[0], window_sizes.shape[0]), -1.0) for i in prange(window_sizes.shape[0]): window_size = int(window_sizes[i] * sample_rate) - for l, r in zip(prange(0, data.shape[0] + 1), prange(window_size, data.shape[0] + 1)): + for l, r in zip( + prange(0, data.shape[0] + 1), prange(window_size, data.shape[0] + 1) + ): sample = data[l:r] cnt, last_val = 0, -1 if sample[0] > val: @@ -226,8 +231,10 @@ def sliding_crossings(data: np.ndarray, return results.astype(np.int32) @staticmethod - @njit('(float32[:], int64, int64, )', cache=True, fastmath=True) - def percentile_difference(data: np.ndarray, upper_pct: int, lower_pct: int) -> float: + @njit("(float32[:], int64, int64, )", cache=True, fastmath=True) + def percentile_difference( + data: np.ndarray, upper_pct: int, lower_pct: int + ) -> float: """ Jitted compute of the difference between the ``upper`` and ``lower`` percentiles of the data as a percentage of the median value. @@ -251,16 +258,20 @@ def percentile_difference(data: np.ndarray, upper_pct: int, lower_pct: int) -> f """ - upper_val, lower_val = np.percentile(data, upper_pct), np.percentile(data, lower_pct) + upper_val, lower_val = np.percentile(data, upper_pct), np.percentile( + data, lower_pct + ) return np.abs(upper_val - lower_val) / np.median(data) @staticmethod - @njit('(float32[:], int64, int64, float64[:], int64, )', cache=True, fastmath=True) - def sliding_percentile_difference(data: np.ndarray, - upper_pct: int, - lower_pct: int, - window_sizes: np.ndarray, - sample_rate: int) -> np.ndarray: + @njit("(float32[:], int64, int64, float64[:], int64, )", cache=True, fastmath=True) + def sliding_percentile_difference( + data: np.ndarray, + upper_pct: int, + lower_pct: int, + window_sizes: np.ndarray, + sample_rate: int, + ) -> np.ndarray: """ Jitted computes the difference between the upper and lower percentiles within a sliding window for each position in the time series using various window sizes. It returns a 2D array where each row corresponds to a position in the time series, @@ -277,9 +288,13 @@ def sliding_percentile_difference(data: np.ndarray, results = np.full((data.shape[0], window_sizes.shape[0]), -1.0) for i in prange(window_sizes.shape[0]): window_size = int(window_sizes[i] * sample_rate) - for l, r in zip(prange(0, data.shape[0] + 1), prange(window_size, data.shape[0] + 1)): + for l, r in zip( + prange(0, data.shape[0] + 1), prange(window_size, data.shape[0] + 1) + ): sample = data[l:r] - upper_val, lower_val = np.percentile(sample, upper_pct), np.percentile(sample, lower_pct) + upper_val, lower_val = np.percentile(sample, upper_pct), np.percentile( + sample, lower_pct + ) median = np.median(sample) if median != 0: results[r - 1, i] = np.abs(upper_val - lower_val) / median @@ -289,7 +304,7 @@ def sliding_percentile_difference(data: np.ndarray, return results.astype(np.float32) @staticmethod - @njit('(float32[:], float64,)', cache=True, fastmath=True) + @njit("(float32[:], float64,)", cache=True, fastmath=True) def percent_beyond_n_std(data: np.ndarray, n: float) -> float: """ Jitted compute of the ratio of values in time-series more than N standard deviations from the mean of the time-series. @@ -316,12 +331,10 @@ def percent_beyond_n_std(data: np.ndarray, n: float) -> float: return np.argwhere(np.abs(data) > target).shape[0] / data.shape[0] @staticmethod - @njit('(float32[:], float64, float64[:], int64,)', cache=True, fastmath=True) - def sliding_percent_beyond_n_std(data: np.ndarray, - n: float, - window_sizes: np.ndarray, - sample_rate: int) -> np.ndarray: - + @njit("(float32[:], float64, float64[:], int64,)", cache=True, fastmath=True) + def sliding_percent_beyond_n_std( + data: np.ndarray, n: float, window_sizes: np.ndarray, sample_rate: int + ) -> np.ndarray: """ Computed the percentage of data points that exceed 'n' standard deviations from the mean for each position in the time series using various window sizes. It returns a 2D array where each row corresponds to a position in the time series, @@ -338,14 +351,18 @@ def sliding_percent_beyond_n_std(data: np.ndarray, target = (np.std(data) * n) + np.mean(data) for i in prange(window_sizes.shape[0]): window_size = int(window_sizes[i] * sample_rate) - for l, r in zip(prange(0, data.shape[0] + 1), prange(window_size, data.shape[0] + 1)): + for l, r in zip( + prange(0, data.shape[0] + 1), prange(window_size, data.shape[0] + 1) + ): sample = data[l:r] - results[r - 1, i] = np.argwhere(np.abs(sample) > target).shape[0] / sample.shape[0] + results[r - 1, i] = ( + np.argwhere(np.abs(sample) > target).shape[0] / sample.shape[0] + ) return results.astype(np.float32) @staticmethod - @njit('(float32[:], int64, int64, )', cache=True, fastmath=True) + @njit("(float32[:], int64, int64, )", cache=True, fastmath=True) def percent_in_percentile_window(data: np.ndarray, upper_pct: int, lower_pct: int): """ Jitted compute of the ratio of values in time-series that fall between the ``upper`` and ``lower`` percentile. @@ -369,16 +386,23 @@ def percent_in_percentile_window(data: np.ndarray, upper_pct: int, lower_pct: in :align: center """ - upper_val, lower_val = np.percentile(data, upper_pct), np.percentile(data, lower_pct) - return np.argwhere((data <= upper_val) & (data >= lower_val)).flatten().shape[0] / data.shape[0] + upper_val, lower_val = np.percentile(data, upper_pct), np.percentile( + data, lower_pct + ) + return ( + np.argwhere((data <= upper_val) & (data >= lower_val)).flatten().shape[0] + / data.shape[0] + ) @staticmethod - @njit('(float32[:], int64, int64, float64[:], int64)', cache=True, fastmath=True) - def sliding_percent_in_percentile_window(data: np.ndarray, - upper_pct: int, - lower_pct: int, - window_sizes: np.ndarray, - sample_rate: int): + @njit("(float32[:], int64, int64, float64[:], int64)", cache=True, fastmath=True) + def sliding_percent_in_percentile_window( + data: np.ndarray, + upper_pct: int, + lower_pct: int, + window_sizes: np.ndarray, + sample_rate: int, + ): """ Jitted compute of the percentage of data points falling within a percentile window in a sliding manner. @@ -395,17 +419,26 @@ def sliding_percent_in_percentile_window(data: np.ndarray, """ results = np.full((data.shape[0], window_sizes.shape[0]), -1.0) - upper_val, lower_val = np.percentile(data, upper_pct), np.percentile(data, lower_pct) + upper_val, lower_val = np.percentile(data, upper_pct), np.percentile( + data, lower_pct + ) for i in prange(window_sizes.shape[0]): window_size = int(window_sizes[i] * sample_rate) - for l, r in zip(prange(0, data.shape[0] + 1), prange(window_size, data.shape[0] + 1)): + for l, r in zip( + prange(0, data.shape[0] + 1), prange(window_size, data.shape[0] + 1) + ): sample = data[l:r] - results[r - 1, i] = np.argwhere((sample <= upper_val) & (sample >= lower_val)).flatten().shape[0] / sample.shape[0] + results[r - 1, i] = ( + np.argwhere((sample <= upper_val) & (sample >= lower_val)) + .flatten() + .shape[0] + / sample.shape[0] + ) return results.astype(np.float32) @staticmethod - @njit('(float32[:],)', fastmath=True, cache=True) + @njit("(float32[:],)", fastmath=True, cache=True) def petrosian_fractal_dimension(data: np.ndarray) -> float: """ Calculate the Petrosian Fractal Dimension (PFD) of a given time series data. The PFD is a measure of the @@ -449,13 +482,16 @@ def petrosian_fractal_dimension(data: np.ndarray) -> float: if zC == 0: return -1.0 - return np.log10(data.shape[0]) / (np.log10(data.shape[0]) + np.log10(data.shape[0] / (data.shape[0] + 0.4 * zC))) + return np.log10(data.shape[0]) / ( + np.log10(data.shape[0]) + + np.log10(data.shape[0] / (data.shape[0] + 0.4 * zC)) + ) @staticmethod - @njit('(float32[:], float64[:], int64)', fastmath=True, cache=True) - def sliding_petrosian_fractal_dimension(data: np.ndarray, - window_sizes: np.ndarray, - sample_rate: int) -> np.ndarray: + @njit("(float32[:], float64[:], int64)", fastmath=True, cache=True) + def sliding_petrosian_fractal_dimension( + data: np.ndarray, window_sizes: np.ndarray, sample_rate: int + ) -> np.ndarray: """ Jitted compute of Petrosian Fractal Dimension over sliding windows in a data array. @@ -473,8 +509,12 @@ def sliding_petrosian_fractal_dimension(data: np.ndarray, results = np.full((data.shape[0], window_sizes.shape[0]), -1.0) for i in prange(window_sizes.shape[0]): window_size = int(window_sizes[i] * sample_rate) - for l, r in zip(prange(0, data.shape[0] + 1), prange(window_size, data.shape[0] + 1)): - sample = (data[l:r] - np.min(data[l:r])) / (np.max(data[l:r]) - np.min(data[l:r])) + for l, r in zip( + prange(0, data.shape[0] + 1), prange(window_size, data.shape[0] + 1) + ): + sample = (data[l:r] - np.min(data[l:r])) / ( + np.max(data[l:r]) - np.min(data[l:r]) + ) derivative = sample[1:] - sample[:-1] if derivative.shape[0] == 0: results[r - 1, i] = -1.0 @@ -492,13 +532,15 @@ def sliding_petrosian_fractal_dimension(data: np.ndarray, if zC == 0: results[r - 1, i] = -1.0 else: - results[r - 1, i] = np.log10(sample.shape[0]) / (np.log10(sample.shape[0]) + np.log10( - sample.shape[0] / (sample.shape[0] + 0.4 * zC))) + results[r - 1, i] = np.log10(sample.shape[0]) / ( + np.log10(sample.shape[0]) + + np.log10(sample.shape[0] / (sample.shape[0] + 0.4 * zC)) + ) return results.astype(np.float32) @staticmethod - @njit('(float32[:], int64)') + @njit("(float32[:], int64)") def higuchi_fractal_dimension(data: np.ndarray, kmax: int = 10): """ Jitted compute of the Higuchi Fractal Dimension of a given time series data. The Higuchi Fractal Dimension provides a measure of the fractal @@ -529,8 +571,12 @@ def higuchi_fractal_dimension(data: np.ndarray, kmax: int = 10): """ L, N = np.zeros(kmax - 1), len(data) - x = np.hstack((-np.log(np.arange(2, kmax + 1)).reshape(-1, 1).astype(np.float32), - np.ones(kmax - 1).reshape(-1, 1).astype(np.float32))) + x = np.hstack( + ( + -np.log(np.arange(2, kmax + 1)).reshape(-1, 1).astype(np.float32), + np.ones(kmax - 1).reshape(-1, 1).astype(np.float32), + ) + ) for k in prange(2, kmax + 1): Lk = np.zeros(k) for m in range(0, k): @@ -545,9 +591,8 @@ def higuchi_fractal_dimension(data: np.ndarray, kmax: int = 10): return np.linalg.lstsq(x, L.astype(np.float32))[0][0] @staticmethod - @njit('(float32[:], int64, int64,)', fastmath=True) + @njit("(float32[:], int64, int64,)", fastmath=True) def permutation_entropy(data: np.ndarray, dimension: int, delay: int) -> float: - """ Calculate the permutation entropy of a time series. @@ -609,7 +654,7 @@ def permutation_entropy(data: np.ndarray, dimension: int, delay: int) -> float: return -np.sum(probs * np.log(probs)) @staticmethod - @njit('(float32[:],)', fastmath=True) + @njit("(float32[:],)", fastmath=True) def line_length(data: np.ndarray) -> float: """ Calculate the line length of a 1D array. @@ -645,11 +690,10 @@ def line_length(data: np.ndarray) -> float: return np.sum(diff[1:]) @staticmethod - @njit('(float32[:], float64[:], int64)', fastmath=True) - def sliding_line_length(data: np.ndarray, - window_sizes: np.ndarray, - sample_rate: int) -> np.ndarray: - + @njit("(float32[:], float64[:], int64)", fastmath=True) + def sliding_line_length( + data: np.ndarray, window_sizes: np.ndarray, sample_rate: int + ) -> np.ndarray: """ Jitted compute of sliding line length for a given time series using different window sizes. @@ -674,16 +718,16 @@ def sliding_line_length(data: np.ndarray, results = np.full((data.shape[0], window_sizes.shape[0]), -1.0) for i in prange(window_sizes.shape[0]): window_size = int(window_sizes[i] * sample_rate) - for l, r in zip(prange(0, data.shape[0] + 1), prange(window_size, data.shape[0] + 1)): + for l, r in zip( + prange(0, data.shape[0] + 1), prange(window_size, data.shape[0] + 1) + ): sample = data[l:r] results[r - 1, i] = np.sum(np.abs(np.diff(sample.astype(np.float64)))) return results.astype(np.float32) - + @staticmethod - @njit('(float32[:], float64[:], int64)', fastmath=True, cache=True) - def sliding_variance(data: np.ndarray, - window_sizes: np.ndarray, - sample_rate: int): + @njit("(float32[:], float64[:], int64)", fastmath=True, cache=True) + def sliding_variance(data: np.ndarray, window_sizes: np.ndarray, sample_rate: int): """ Jitted compute of the variance of data within sliding windows of varying sizes applied to the input data array. Variance is a measure of data dispersion or spread. @@ -697,19 +741,42 @@ def sliding_variance(data: np.ndarray, results = np.full((data.shape[0], window_sizes.shape[0]), -1.0) for i in prange(window_sizes.shape[0]): window_size = int(window_sizes[i] * sample_rate) - for l, r in zip(prange(0, data.shape[0] + 1), prange(window_size, data.shape[0] + 1)): - sample = (data[l:r] - np.min(data[l:r])) / (np.max(data[l:r]) - np.min(data[l:r])) + for l, r in zip( + prange(0, data.shape[0] + 1), prange(window_size, data.shape[0] + 1) + ): + sample = (data[l:r] - np.min(data[l:r])) / ( + np.max(data[l:r]) - np.min(data[l:r]) + ) results[r - 1, i] = np.var(sample) return results.astype(np.float32) @staticmethod - @njit('(float32[:], float64[:], int64, types.ListType(types.unicode_type))', fastmath=True, cache=True) - def sliding_descriptive_statistics(data: np.ndarray, - window_sizes: np.ndarray, - sample_rate: int, - statistics: List[Literal['var', 'max', 'min', 'std', 'median', 'mean', 'mad', 'sum', 'mac', 'rms', 'absenergy']]) -> np.ndarray: - + @njit( + "(float32[:], float64[:], int64, types.ListType(types.unicode_type))", + fastmath=True, + cache=True, + ) + def sliding_descriptive_statistics( + data: np.ndarray, + window_sizes: np.ndarray, + sample_rate: int, + statistics: List[ + Literal[ + "var", + "max", + "min", + "std", + "median", + "mean", + "mad", + "sum", + "mac", + "rms", + "absenergy", + ] + ], + ) -> np.ndarray: """ Jitted compute of descriptive statistics over sliding windows in 1D data array. @@ -739,59 +806,61 @@ def sliding_descriptive_statistics(data: np.ndarray, for j in prange(len(statistics)): for i in prange(window_sizes.shape[0]): window_size = int(window_sizes[i] * sample_rate) - for l, r in zip(prange(0, data.shape[0] + 1), prange(window_size, data.shape[0] + 1)): + for l, r in zip( + prange(0, data.shape[0] + 1), prange(window_size, data.shape[0] + 1) + ): sample = data[l:r] - if statistics[j] == 'var': + if statistics[j] == "var": results[j, r - 1, i] = np.var(sample) - elif statistics[j] == 'max': + elif statistics[j] == "max": results[j, r - 1, i] = np.max(sample) - elif statistics[j] == 'min': + elif statistics[j] == "min": results[j, r - 1, i] = np.min(sample) - elif statistics[j] == 'std': + elif statistics[j] == "std": results[j, r - 1, i] = np.std(sample) - elif statistics[j] == 'median': + elif statistics[j] == "median": results[j, r - 1, i] = np.median(sample) - elif statistics[j] == 'mean': + elif statistics[j] == "mean": results[j, r - 1, i] = np.mean(sample) - elif statistics[j] == 'mad': - results[j, r - 1, i] = np.median(np.abs(sample - np.median(sample))) - elif statistics[j] == 'sum': + elif statistics[j] == "mad": + results[j, r - 1, i] = np.median( + np.abs(sample - np.median(sample)) + ) + elif statistics[j] == "sum": results[j, r - 1, i] = np.sum(sample) - elif statistics[j] == 'mac': + elif statistics[j] == "mac": results[j, r - 1, i] = np.mean(np.abs(sample[1:] - sample[:-1])) - elif statistics[j] == 'rms': - results[j, r - 1, i] = np.sqrt(np.mean(sample ** 2)) - elif statistics[j] == 'absenergy': - results[j, r - 1, i] = np.sqrt(np.sum(sample ** 2)) + elif statistics[j] == "rms": + results[j, r - 1, i] = np.sqrt(np.mean(sample**2)) + elif statistics[j] == "absenergy": + results[j, r - 1, i] = np.sqrt(np.sum(sample**2)) return results.astype(np.float32) @staticmethod - def dominant_frequencies(data: np.ndarray, - fps: float, - k: int, - window_function: Literal['Hann', 'Hamming', 'Blackman'] = None): - - """ Find the K dominant frequencies within a feature vector """ - - if window_function == 'Hann': + def dominant_frequencies( + data: np.ndarray, + fps: float, + k: int, + window_function: Literal["Hann", "Hamming", "Blackman"] = None, + ): + """Find the K dominant frequencies within a feature vector""" + + if window_function == "Hann": data = data * np.hanning(len(data)) - elif window_function == 'Hamming': + elif window_function == "Hamming": data = data * np.hamming(len(data)) - elif window_function == 'Blackman': + elif window_function == "Blackman": data = data * np.blackman(len(data)) fft_result = np.fft.fft(data) frequencies = np.fft.fftfreq(data.shape[0], 1 / fps) magnitude = np.abs(fft_result) - return frequencies[np.argsort(magnitude)[-(k + 1):-1]] + return frequencies[np.argsort(magnitude)[-(k + 1) : -1]] @staticmethod - @njit('(float32[:], float64, boolean,)') - def longest_strike(data: np.ndarray, - threshold: float, - above: bool): - + @njit("(float32[:], float64, boolean,)") + def longest_strike(data: np.ndarray, threshold: float, above: bool): """ Jitted compute of the length of the longest consecutive sequence of values in the input data that either exceed or fall below a specified threshold. @@ -823,20 +892,22 @@ def longest_strike(data: np.ndarray, cnt, r = cnt + 1, r + 1 l += 1 - if cnt > result: result = cnt + if cnt > result: + result = cnt if data.shape[0] - l < result: break r, cnt = l, 0 return result @staticmethod - @njit('(float32[:], float64, float64[:], int64, boolean,)') - def sliding_longest_strike(data: np.ndarray, - threshold: float, - time_windows: np.ndarray, - sample_rate: int, - above: bool) -> np.ndarray: - + @njit("(float32[:], float64, float64[:], int64, boolean,)") + def sliding_longest_strike( + data: np.ndarray, + threshold: float, + time_windows: np.ndarray, + sample_rate: int, + above: bool, + ) -> np.ndarray: """ Jitted compute of the length of the longest strike of values within sliding time windows that satisfy a given condition. @@ -864,7 +935,9 @@ def sliding_longest_strike(data: np.ndarray, for i in prange(time_windows.shape[0]): window_size = int(time_windows[i] * sample_rate) - for l1, r1 in zip(prange(0, data.shape[0] + 1), prange(window_size, data.shape[0] + 1)): + for l1, r1 in zip( + prange(0, data.shape[0] + 1), prange(window_size, data.shape[0] + 1) + ): sample = data[l1:r1] result, l, r, cnt = -np.inf, 0, 0, 0 @@ -881,7 +954,8 @@ def sliding_longest_strike(data: np.ndarray, cnt, r = cnt + 1, r + 1 l += 1 - if cnt > result: result = cnt + if cnt > result: + result = cnt if data.shape[0] - l < result: results[r - 1, i] = result break @@ -892,11 +966,10 @@ def sliding_longest_strike(data: np.ndarray, return results @staticmethod - @njit('(float32[:], float64, int64, boolean,)') - def time_since_previous_threshold(data: np.ndarray, - threshold: float, - sample_rate: int, - above: bool) -> np.ndarray: + @njit("(float32[:], float64, int64, boolean,)") + def time_since_previous_threshold( + data: np.ndarray, threshold: float, sample_rate: int, above: bool + ) -> np.ndarray: """ Jitted compute of the time (in seconds) that has elapsed since the last occurrence of a value above (or below) a specified threshold in a time series. The time series is assumed to have a constant sample rate. @@ -934,11 +1007,10 @@ def time_since_previous_threshold(data: np.ndarray, return results @staticmethod - @njit('(float32[:], float64, int64, boolean,)') - def time_since_previous_target_value(data: np.ndarray, - value: float, - sample_rate: int, - inverse: bool) -> np.ndarray: + @njit("(float32[:], float64, int64, boolean,)") + def time_since_previous_target_value( + data: np.ndarray, value: float, sample_rate: int, inverse: bool + ) -> np.ndarray: """ Calculate the time duration (in seconds) since the previous occurrence of a specific value in a data array. @@ -977,11 +1049,8 @@ def time_since_previous_target_value(data: np.ndarray, results[i] = (i - x[-1]) / sample_rate return results - - - @staticmethod - @njit('(float32[:],)') + @njit("(float32[:],)") def benford_correlation(data: np.ndarray) -> float: """ Jitted compute of the correlation between the Benford's Law distribution and the first-digit distribution of given data. @@ -1012,7 +1081,7 @@ def benford_correlation(data: np.ndarray) -> float: benford_distribution = np.array([np.log10(1 + 1 / n) for n in range(1, 10)]) first_vals, digit_ratio = np.full((data.shape[0]), np.nan), np.full(9, np.nan) for i in prange(data.shape[0]): - first_vals[i] = (data[i] // 10 ** (int(np.log10(data[i])) - 1 + 1)) + first_vals[i] = data[i] // 10 ** (int(np.log10(data[i])) - 1 + 1) for i in range(1, 10): digit_ratio[i - 1] = np.argwhere(first_vals == i).shape[0] / data.shape[0] @@ -1020,10 +1089,10 @@ def benford_correlation(data: np.ndarray) -> float: return np.corrcoef(benford_distribution, digit_ratio)[0, 1] @staticmethod - @njit('(float32[:], float64[:], int64)') - def sliding_benford_correlation(data: np.ndarray, - time_windows: np.ndarray, - sample_rate: int) -> np.ndarray: + @njit("(float32[:], float64[:], int64)") + def sliding_benford_correlation( + data: np.ndarray, time_windows: np.ndarray, sample_rate: int + ) -> np.ndarray: """ Calculate the sliding Benford's Law correlation coefficient for a given dataset within specified time windows. @@ -1060,26 +1129,35 @@ def sliding_benford_correlation(data: np.ndarray, results = np.full((data.shape[0], time_windows.shape[0]), 0.0) for i in prange(time_windows.shape[0]): window_size = int(time_windows[i] * sample_rate) - for l, r in zip(prange(0, data.shape[0] + 1), prange(window_size, data.shape[0] + 1)): - first_vals, digit_ratio = np.full((data.shape[0]), np.nan), np.full(9, np.nan) + for l, r in zip( + prange(0, data.shape[0] + 1), prange(window_size, data.shape[0] + 1) + ): + first_vals, digit_ratio = np.full((data.shape[0]), np.nan), np.full( + 9, np.nan + ) sample = data[l:r] for k in range(sample.shape[0]): - first_vals[k] = (sample[k] // 10 ** (int(np.log10(sample[k])) - 1 + 1)) + first_vals[k] = sample[k] // 10 ** ( + int(np.log10(sample[k])) - 1 + 1 + ) for k in range(1, 10): - digit_ratio[k - 1] = np.argwhere(first_vals == k).shape[0] / sample.shape[0] + digit_ratio[k - 1] = ( + np.argwhere(first_vals == k).shape[0] / sample.shape[0] + ) results[r - 1, i] = np.corrcoef(benford_distribution, digit_ratio)[0, 1] return results.astype(np.float32) @staticmethod - @njit('(float32[:], float64, float64, float64, float64, float64)', fastmath=True) - def spike_finder(data: np.ndarray, - sample_rate: int, - baseline: float, - min_spike_amplitude: float, - min_fwhm: float = -np.inf, - min_half_width: float = -np.inf) -> float: - + @njit("(float32[:], float64, float64, float64, float64, float64)", fastmath=True) + def spike_finder( + data: np.ndarray, + sample_rate: int, + baseline: float, + min_spike_amplitude: float, + min_fwhm: float = -np.inf, + min_half_width: float = -np.inf, + ) -> float: """ Identify and characterize spikes in a given time-series data sequence. This method identifies spikes in the input data based on the specified criteria and characterizes each detected spike by computing its amplitude, full-width at half maximum (FWHM), and half-width. @@ -1104,24 +1182,33 @@ def spike_finder(data: np.ndarray, """ spike_idxs = np.argwhere(data >= baseline + min_spike_amplitude).flatten() - spike_idx = np.split(spike_idxs, np.argwhere(spike_idxs[1:] - spike_idxs[:-1] != 1).flatten() + 1) - spike_dict = Dict.empty(key_type=types.int64, - value_type=Dict.empty(key_type=types.unicode_type, value_type=types.float64)) + spike_idx = np.split( + spike_idxs, np.argwhere(spike_idxs[1:] - spike_idxs[:-1] != 1).flatten() + 1 + ) + spike_dict = Dict.empty( + key_type=types.int64, + value_type=Dict.empty( + key_type=types.unicode_type, value_type=types.float64 + ), + ) spike_vals = [] for i in prange(len(spike_idx)): spike_data = data[spike_idx[i]] spike_amplitude = np.max(spike_data) - baseline half_width_idx = np.argwhere(spike_data > spike_amplitude / 2).flatten() - spike_dict[i] = {'amplitude': np.max(spike_data) - baseline, - 'fwhm': (half_width_idx[-1] - half_width_idx[0]) / sample_rate, - 'half_width': half_width_idx.shape[0] / sample_rate} + spike_dict[i] = { + "amplitude": np.max(spike_data) - baseline, + "fwhm": (half_width_idx[-1] - half_width_idx[0]) / sample_rate, + "half_width": half_width_idx.shape[0] / sample_rate, + } spike_vals.append(spike_data) remove_idx = [] for k, v in spike_dict.items(): - if (v['fwhm'] < min_fwhm) or (v['half_width'] < min_half_width): + if (v["fwhm"] < min_fwhm) or (v["half_width"] < min_half_width): remove_idx.append(k) - for idx in remove_idx: spike_dict.pop(idx) + for idx in remove_idx: + spike_dict.pop(idx) spike_idx = [i for j, i in enumerate(spike_idx) if j not in remove_idx] spike_vals = [i for j, i in enumerate(spike_vals) if j not in remove_idx] @@ -1130,11 +1217,13 @@ def spike_finder(data: np.ndarray, # @njit("(float32[:], types.List(types.Array(types.int64, 1, 'C')), int64, float64, float64)", fastmath=True) @staticmethod @jit(nopython=True, fastmath=True) - def spike_train_finder(data: np.ndarray, - spike_idx: list, - sample_rate: float, - min_spike_train_length: float = np.inf, - max_spike_train_separation: float = np.inf): + def spike_train_finder( + data: np.ndarray, + spike_idx: list, + sample_rate: float, + min_spike_train_length: float = np.inf, + max_spike_train_separation: float = np.inf, + ): """ Identify and analyze spike trains from a list of spike indices. @@ -1180,14 +1269,22 @@ def spike_train_finder(data: np.ndarray, >>> results = TimeseriesFeatureMixin().spike_train_finder(data=data, spike_idx=typed.List(spike_idx), sample_rate=2.0, min_spike_train_length=2.0, max_spike_train_separation=2.0) """ - l, r, = 0, 1 + ( + l, + r, + ) = ( + 0, + 1, + ) train_data_idx = [] train_spikes_idx = [] max_spike_train_separation = int(max_spike_train_separation * sample_rate) min_spike_train_length = int(min_spike_train_length * sample_rate) while l < len(spike_idx): current_train, current_spike_idx = spike_idx[l], [l] - while r < len(spike_idx) and ((spike_idx[r][0] - current_train[-1]) <= max_spike_train_separation): + while r < len(spike_idx) and ( + (spike_idx[r][0] - current_train[-1]) <= max_spike_train_separation + ): current_train = np.hstack((current_train, spike_idx[r])) current_spike_idx.append(r) r += 1 @@ -1195,46 +1292,55 @@ def spike_train_finder(data: np.ndarray, train_data_idx.append(current_train) train_spikes_idx.append(current_spike_idx) - spike_dict = Dict.empty(key_type=types.int64, - value_type=Dict.empty(key_type=types.unicode_type, value_type=types.float64)) + spike_dict = Dict.empty( + key_type=types.int64, + value_type=Dict.empty( + key_type=types.unicode_type, value_type=types.float64 + ), + ) for i in prange(len(train_data_idx)): if train_data_idx[i].shape[0] >= min_spike_train_length: spike_train_amps = data[train_data_idx[i]] - spike_train_idx = [k for j, k in enumerate(spike_idx) if j in train_spikes_idx[i]] + spike_train_idx = [ + k for j, k in enumerate(spike_idx) if j in train_spikes_idx[i] + ] train_spike_lengths = np.array(([len(j) for j in spike_train_idx])) - spike_dict[int(i)] = {'train_start_time': float(train_data_idx[i][0] * sample_rate), - 'train_end_time': train_data_idx[i][-1] * sample_rate, - 'train_start_obs': train_data_idx[i][0], - 'train_end_obs': train_data_idx[i][-1], - 'spike_cnt': len(train_spikes_idx[i]), - 'train_length_obs_cnt': len(spike_train_amps), - 'train_length_obs_s': len(spike_train_amps) / sample_rate, - 'train_spike_mean_lengths_s': np.mean(train_spike_lengths) * sample_rate, - 'train_spike_std_length_obs': np.mean(train_spike_lengths), - 'train_spike_std_length_s': np.mean(train_spike_lengths) * sample_rate, - 'train_spike_max_length_obs': np.max(train_spike_lengths), - 'train_spike_max_length_s': np.max(train_spike_lengths) * sample_rate, - 'train_spike_min_length_obs': np.min(train_spike_lengths), - 'train_spike_min_length_s': np.min(train_spike_lengths) * sample_rate, - 'train_mean_amplitude': np.mean(spike_train_amps), - 'train_std_amplitude': np.std(spike_train_amps), - 'train_min_amplitude': np.min(spike_train_amps), - 'train_max_amplitude': np.max(spike_train_amps)} + spike_dict[int(i)] = { + "train_start_time": float(train_data_idx[i][0] * sample_rate), + "train_end_time": train_data_idx[i][-1] * sample_rate, + "train_start_obs": train_data_idx[i][0], + "train_end_obs": train_data_idx[i][-1], + "spike_cnt": len(train_spikes_idx[i]), + "train_length_obs_cnt": len(spike_train_amps), + "train_length_obs_s": len(spike_train_amps) / sample_rate, + "train_spike_mean_lengths_s": np.mean(train_spike_lengths) + * sample_rate, + "train_spike_std_length_obs": np.mean(train_spike_lengths), + "train_spike_std_length_s": np.mean(train_spike_lengths) + * sample_rate, + "train_spike_max_length_obs": np.max(train_spike_lengths), + "train_spike_max_length_s": np.max(train_spike_lengths) + * sample_rate, + "train_spike_min_length_obs": np.min(train_spike_lengths), + "train_spike_min_length_s": np.min(train_spike_lengths) + * sample_rate, + "train_mean_amplitude": np.mean(spike_train_amps), + "train_std_amplitude": np.std(spike_train_amps), + "train_min_amplitude": np.min(spike_train_amps), + "train_max_amplitude": np.max(spike_train_amps), + } return spike_dict + # data = np.array([1, 4, 2, 3, 5, 6, 8, 7, 9, 10]).astype(np.float32) # results = sliding_descriptive_statistics(data=data, window_sizes=np.array([1.0, 5.0]), sample_rate=2, statistics=typed.List(['mac'])) - - - - # t = np.linspace(0, 50, int(44100 * 2.0), endpoint=False) # sine_wave = 1.0 * np.sin(2 * np.pi * 1.0 * t).astype(np.float32) # TimeseriesFeatureMixin().petrosian_fractal_dimension(data=sine_wave) # #1.0000398187022719 # np.random.shuffle(sine_wave) # TimeseriesFeatureMixin().petrosian_fractal_dimension(data=sine_wave) -# #1.0211625348743218 \ No newline at end of file +# #1.0211625348743218