From 985c01d78b063123e043f5b838e1716e92485abe Mon Sep 17 00:00:00 2001 From: Simon Nilsson Date: Wed, 2 Aug 2023 10:41:17 -0400 Subject: [PATCH] Add files via upload --- .../feature_extraction_supplement_mixin.py | 194 ++++++++---------- 1 file changed, 89 insertions(+), 105 deletions(-) diff --git a/simba/mixins/feature_extraction_supplement_mixin.py b/simba/mixins/feature_extraction_supplement_mixin.py index a7cca604d..2c0a7236d 100644 --- a/simba/mixins/feature_extraction_supplement_mixin.py +++ b/simba/mixins/feature_extraction_supplement_mixin.py @@ -6,14 +6,15 @@ class FeatureExtractionSupplemental(FeatureExtractionMixin): + def __init__(self): FeatureExtractionMixin.__init__(self) @staticmethod @jit(nopython=True) - def _helper_euclidean_distance_timeseries_change( - distances: np.ndarray, time_windows: np.ndarray, fps: int - ): + def _helper_euclidean_distance_timeseries_change(distances: np.ndarray, + time_windows: np.ndarray, + fps: int): """ Private jitted helper called by ``simba.mixins.feature_extraction_supplemental_mixin.FeatureExtractionSupplemental.euclidean_distance_timeseries_change`` """ @@ -23,21 +24,19 @@ def _helper_euclidean_distance_timeseries_change( shifted_distances = np.copy(distances) shifted_distances[0:frms] = np.nan shifted_distances[frms:] = distances[:-frms] - shifted_distances[np.isnan(shifted_distances)] = distances[ - np.isnan(shifted_distances) - ] + shifted_distances[np.isnan(shifted_distances)] = distances[np.isnan(shifted_distances)] results[:, window_cnt] = distances - shifted_distances return results - def euclidean_distance_timeseries_change( - self, - location_1: np.ndarray, - location_2: np.ndarray, - fps: int, - px_per_mm: float, - time_windows: np.ndarray = np.array([0.2, 0.4, 0.8, 1.6]), - ) -> np.ndarray: + + + def euclidean_distance_timeseries_change(self, + location_1: np.ndarray, + location_2: np.ndarray, + fps: int, + px_per_mm: float, + time_windows: np.ndarray = np.array([0.2, 0.4, 0.8, 1.6])) -> np.ndarray: """ Compute the difference in distance between two points in the current frame versus N.N seconds ago. E.g., computes if two points are traveling away from each other (positive output values) or towards each other @@ -59,18 +58,14 @@ def euclidean_distance_timeseries_change( >>> location_2 = np.random.randint(low=0, high=100, size=(2000, 2)).astype('float32') >>> distances = self.euclidean_distance_timeseries_change(location_1=location_1, location_2=location_2, fps=10, px_per_mm=4.33, time_windows=np.array([0.2, 0.4, 0.8, 1.6])) """ - distances = self.framewise_euclidean_distance( - location_1=location_1, location_2=location_2, px_per_mm=px_per_mm - ) - return self._helper_euclidean_distance_timeseries_change( - distances=distances, fps=fps, time_windows=time_windows - ).astype(int) + distances = self.framewise_euclidean_distance(location_1=location_1, location_2=location_2, px_per_mm=px_per_mm) + return self._helper_euclidean_distance_timeseries_change(distances=distances, fps=fps, time_windows=time_windows).astype(int) @staticmethod @jit(nopython=True) - def timeseries_independent_sample_t( - data: np.ndarray, group_size_s: int, fps: int - ) -> np.ndarray: + def timeseries_independent_sample_t(data: np.ndarray, + group_size_s: int, + fps: int) -> np.ndarray: """ Compute independent-sample t-statistics for sequentially binned values in a time-series. E.g., compute t-test statistics when comparing ``Feature N`` in the current 1s @@ -93,19 +88,17 @@ def timeseries_independent_sample_t( window_size = int(group_size_s * fps) data = np.split(data, list(range(window_size, data.shape[0], window_size))) for cnt, i in enumerate(prange(1, len(data))): - start, end = int((cnt + 1) * window_size), int( - ((cnt + 1) * window_size) + window_size - ) - mean_1, mean_2 = np.mean(data[i - 1]), np.mean(data[i]) - stdev_1, stdev_2 = np.std(data[i - 1]), np.std(data[i]) - results[start:end] = (mean_1 - mean_2) / np.sqrt( - (stdev_1 / data[i - 1].shape[0]) + (stdev_2 / data[i].shape[0]) - ) + start, end = int((cnt + 1) * window_size), int(((cnt + 1) * window_size) + window_size) + mean_1, mean_2 = np.mean(data[i-1]), np.mean(data[i]) + stdev_1, stdev_2 = np.std(data[i-1]), np.std(data[i]) + results[start:end] = (mean_1 - mean_2) / np.sqrt((stdev_1 / data[i-1].shape[0]) + (stdev_2 / data[i].shape[0])) return results - def two_sample_ks( - self, data: np.ndarray, group_size_s: int, fps: int - ) -> np.ndarray: + + def two_sample_ks(self, + data: np.ndarray, + group_size_s: int, + fps: int) -> np.ndarray: """ Compute Kolmogorov two-sample statistics for sequentially binned values in a time-series. E.g., compute KS statistics when comparing ``Feature N`` in the current 1s time-window, versus ``Feature N`` in the previous 1s time-window. @@ -123,15 +116,14 @@ def two_sample_ks( window_size, results = int(group_size_s * fps), np.full((data.shape[0]), -1.0) data = np.split(data, list(range(window_size, data.shape[0], window_size))) for cnt, i in enumerate(prange(1, len(data))): - start, end = int((cnt + 1) * window_size), int( - ((cnt + 1) * window_size) + window_size - ) - results[start:end] = stats.ks_2samp( - data1=data[i - 1], data2=data[i] - ).statistic + start, end = int((cnt + 1) * window_size), int(((cnt + 1) * window_size) + window_size) + results[start:end] = stats.ks_2samp(data1=data[i-1], data2=data[i]).statistic return results - def shapiro_wilks(self, data: np.ndarray, bin_size_s: int, fps: int) -> np.ndarray: + def shapiro_wilks(self, + data: np.ndarray, + bin_size_s: int, + fps: int) -> np.ndarray: """ Compute Shapiro-Wilks normality statistics for sequentially binned values in a time-series. E.g., compute the normality statistics of ``Feature N`` in each window of ``group_size_s`` seconds. @@ -149,15 +141,15 @@ def shapiro_wilks(self, data: np.ndarray, bin_size_s: int, fps: int) -> np.ndarr window_size, results = int(bin_size_s * fps), np.full((data.shape[0]), -1.0) data = np.split(data, list(range(window_size, data.shape[0], window_size))) for cnt, i in enumerate(prange(1, len(data))): - start, end = int((cnt + 1) * window_size), int( - ((cnt + 1) * window_size) + window_size - ) + start, end = int((cnt + 1) * window_size), int(((cnt + 1) * window_size) + window_size) results[start:end] = stats.shapiro(data[i])[0] return results @staticmethod @jit(nopython=True) - def peak_ratio(data: np.ndarray, bin_size_s: int, fps: int): + def peak_ratio(data: np.ndarray, + bin_size_s: int, + fps: int): """ Compute the ratio of peak values relative to number of values within each seqential time-period represented of ``bin_size_s`` seconds. @@ -180,22 +172,17 @@ def peak_ratio(data: np.ndarray, bin_size_s: int, fps: int): window_size, results = int(bin_size_s * fps), np.full((data.shape[0]), -1.0) data = np.split(data, list(range(window_size, data.shape[0], window_size))) for cnt, i in enumerate(prange(len(data))): - start, end = int((cnt + 1) * window_size), int( - ((cnt + 1) * window_size) + window_size - ) - results[start:end] = ( - np.sum( - (data[i] > np.roll(data[i], 1)) & (data[i] > np.roll(data[i], -1)) - ) - / data[i].shape[0] - ) + start, end = int((cnt + 1) * window_size), int(((cnt + 1) * window_size) + window_size) + results[start:end] = np.sum((data[i] > np.roll(data[i],1)) & (data[i] > np.roll(data[i],-1))) / data[i].shape[0] return results + @staticmethod @jit(nopython=True) - def rolling_categorical_switches( - data: np.ndarray, time_windows: np.ndarray, fps: int - ) -> np.ndarray: + def rolling_categorical_switches(data: np.ndarray, + time_windows: np.ndarray, + fps: int) -> np.ndarray: + """ Compute the count of in categorical feature switches within rolling windows. @@ -218,7 +205,7 @@ def rolling_categorical_switches( for time_window in prange(time_windows.shape[0]): jump_frms = int(time_windows[time_window] * fps) for current_frm in prange(jump_frms, data.shape[0]): - time_slice = data[current_frm - jump_frms : current_frm] + time_slice = data[current_frm-jump_frms: current_frm] current_value, unique_cnt = time_slice[0], 0 for i in prange(1, time_slice.shape[0]): if time_slice[i] != current_value: @@ -229,7 +216,8 @@ def rolling_categorical_switches( @staticmethod @jit(nopython=True) - def consecutive_time_series_categories_count(data: np.ndarray, fps: int): + def consecutive_time_series_categories_count(data: np.ndarray, + fps: int): """ Compute the count of consecutive milliseconds the feature value has remained static. For example, compute for how long in milleseconds the animal has remained in the current cardinal direction or the @@ -250,21 +238,27 @@ def consecutive_time_series_categories_count(data: np.ndarray, fps: int): results = np.full((data.shape[0]), 0.0) for i in prange(1, data.shape[0]): - if data[i] == data[i - 1]: - results[i] = results[i - 1] + 1 + if data[i] == data[i-1]: + results[i] = results[i-1]+1 else: results[i] = 0 return results / fps + @staticmethod @jit(nopython=True) - def horizontal_vs_vertical_movement( - data: np.ndarray, pixels_per_mm: float, time_windows: np.ndarray, fps: int - ) -> np.ndarray: + def horizontal_vs_vertical_movement(data: np.ndarray, + pixels_per_mm: float, + time_windows: np.ndarray, + fps: int) -> np.ndarray: """ Compute the movement along the x-axis relative to the y-axis in rolling time bins. + .. image:: _static/img/x_vs_y_movement.png + :width: 700 + :align: center + :parameter np.ndarray data: 2d array of size len(frames)x2 with body-part coordinates. :parameter int fps: FPS of the recorded video :parameter float pixels_per_mm: Pixels per millimeter of recorded video. @@ -277,35 +271,20 @@ def horizontal_vs_vertical_movement( for time_window in prange(time_windows.shape[0]): jump_frms = int(time_windows[time_window] * fps) for current_frm in prange(jump_frms, results.shape[0]): - x_movement = ( - np.sum( - np.abs( - np.ediff1d(data[current_frm - jump_frms : current_frm, 0]) - ) - ) - / pixels_per_mm - ) - y_movement = ( - np.sum( - np.abs( - np.ediff1d(data[current_frm - jump_frms : current_frm, 1]) - ) - ) - / pixels_per_mm - ) + x_movement = np.sum(np.abs(np.ediff1d(data[current_frm-jump_frms: current_frm, 0]))) / pixels_per_mm + y_movement = np.sum(np.abs(np.ediff1d(data[current_frm-jump_frms: current_frm, 1]))) / pixels_per_mm results[current_frm][time_window] = x_movement - y_movement return results @staticmethod @jit(nopython=True) - def border_distances( - data: np.ndarray, - pixels_per_mm: float, - img_resolution: np.ndarray, - time_window: float, - fps: int, - ): + def border_distances(data: np.ndarray, + pixels_per_mm: float, + img_resolution: np.ndarray, + time_window: float, + fps: int): + """ Compute the mean distance of key-point to the top, bottom, left, and right sides of the image in rolling time-windows. Uses a straight line. @@ -326,23 +305,22 @@ def border_distances( window_size = int(time_window * fps) for current_frm in prange(window_size, results.shape[0]): distances = np.full((4, window_size, 1), np.nan) - windowed_locs = data[current_frm - window_size : current_frm, :] + windowed_locs = data[current_frm - window_size: current_frm, :] for bp_cnt, bp_loc in enumerate(windowed_locs): distances[0, bp_cnt] = np.linalg.norm(np.array([bp_loc[0], 0]) - bp_loc) - distances[1, bp_cnt] = np.linalg.norm( - np.array([bp_loc[0], img_resolution[0]]) - bp_loc - ) + distances[1, bp_cnt] = np.linalg.norm(np.array([bp_loc[0], img_resolution[0]]) - bp_loc) distances[2, bp_cnt] = np.linalg.norm(np.array([0, bp_loc[1]]) - bp_loc) - distances[3, bp_cnt] = np.linalg.norm( - np.array([0, img_resolution[1]]) - bp_loc - ) + distances[3, bp_cnt] = np.linalg.norm(np.array([0, img_resolution[1]]) - bp_loc) for i in prange(4): results[current_frm][i] = np.mean(distances[i]) / pixels_per_mm return results.astype(np.int32) @staticmethod @jit(nopython=True) - def acceleration(data: np.ndarray, pixels_per_mm: float, fps: int): + def acceleration(data: np.ndarray, + pixels_per_mm: float, + fps: int): + """ Compute acceleration. @@ -365,13 +343,15 @@ def acceleration(data: np.ndarray, pixels_per_mm: float, fps: int): for i in prange(fps, shifted_loc.shape[0]): velocity[i] = np.linalg.norm(shifted_loc[i] - data[i]) / pixels_per_mm for current_frm in prange(fps, velocity.shape[0], fps): - print(current_frm - fps, current_frm, current_frm, current_frm + fps) - prior_window = np.mean(velocity[current_frm - fps : current_frm]) - current_window = np.mean(velocity[current_frm : current_frm + fps]) - results[current_frm : current_frm + fps] = current_window - prior_window + print(current_frm-fps, current_frm, current_frm, current_frm+fps) + prior_window = np.mean(velocity[current_frm-fps: current_frm]) + current_window = np.mean(velocity[current_frm: current_frm+fps]) + results[current_frm:current_frm+fps] = current_window - prior_window return results + + # # # @@ -379,12 +359,13 @@ def acceleration(data: np.ndarray, pixels_per_mm: float, fps: int): # # start = time.time() # nose_loc = np.random.randint(low=0, high=500, size=(231, 2)).astype('float32') -# results = FeatureExtractionSupplemental().horizontal_vs_vertical_movement(data=nose_loc, pixels_per_mm=4.33, fps=10, time_windows=np.array([0.4])) +#results = FeatureExtractionSupplemental().horizontal_vs_vertical_movement(data=nose_loc, pixels_per_mm=4.33, fps=10, time_windows=np.array([0.4])) -# results = FeatureExtractionSupplemental().border_distances(data=nose_loc, pixels_per_mm=4.33, fps=10, time_window=0.2, img_resolution=np.array([600, 400])) +#results = FeatureExtractionSupplemental().border_distances(data=nose_loc, pixels_per_mm=4.33, fps=10, time_window=0.2, img_resolution=np.array([600, 400])) + +#results = FeatureExtractionSupplemental().acceleration(data=nose_loc, pixels_per_mm=4.33, fps=10) -# results = FeatureExtractionSupplemental().acceleration(data=nose_loc, pixels_per_mm=4.33, fps=10) # left_ear_loc = np.random.randint(low=0, high=500, size=(10000, 2)).astype('float32') @@ -401,10 +382,13 @@ def acceleration(data: np.ndarray, pixels_per_mm: float, fps: int): # static_count = FeatureExtractionSupplemental().consecutive_time_series_categories_count(data=rotation.values, fps=10) -# rolling_angular_dispersion = FeatureExtractionSupplemental().rolling_angular_dispersion(data=angle_data, time_windows=np.array([0.4]), fps=10) -# print(time.time() - start) +#rolling_angular_dispersion = FeatureExtractionSupplemental().rolling_angular_dispersion(data=angle_data, time_windows=np.array([0.4]), fps=10) + + + +#print(time.time() - start) # # data = np.random.randint(low=0, high=100, size=(223)).astype('float32') @@ -416,4 +400,4 @@ def acceleration(data: np.ndarray, pixels_per_mm: float, fps: int): # start = time.time() # data = np.random.randint(low=0, high=100, size=(50000000)).astype('float32') # results = FeatureExtractionSupplemental().peak_ratio(data=data, group_size_s=1, fps=10) -# print(time.time() - start) +# print(time.time() - start) \ No newline at end of file