diff --git a/fsspark/fs/fdataframe.py b/fsspark/fs/fdataframe.py index 3cba18c..21cc06c 100644 --- a/fsspark/fs/fdataframe.py +++ b/fsspark/fs/fdataframe.py @@ -10,6 +10,7 @@ logger = logging.getLogger("pickfeat") logger.setLevel(logging.INFO) + class FSDataFrame: """ FSDataFrame is a representation of a DataFrame with some functionalities to perform feature selection. @@ -188,7 +189,7 @@ def _collect_features_as_array(self) -> np.array: :return: Numpy array """ - sdf = self.get_sdf().select(*self.get_features_names()) + sdf = self.get_df().select(*self.get_features_names()) a = np.array(sdf.collect()) return a @@ -199,8 +200,8 @@ def to_psdf(self) -> DataFrame: """ return self.__df.pandas_api() - def get_sdf(self) -> DataFrame: - return self.__df + def get_df(self) -> DataFrame: + return self.__df def get_sample_col_name(self) -> str: """ @@ -235,7 +236,7 @@ def _add_row_index(self, index_name: str = '_row_index') -> pd.DataFrame: :return: DataFrame with extra column of row indices. """ # Add a new column with unique row indices using a range - self.__df[index_name] = range(len(self.__df)) + self.__df[index_name] = list(range(len(self.__df))) return self.__df def count_features(self) -> int: @@ -271,7 +272,7 @@ def filter_features(self, features: List[str], keep: bool = True) -> 'FSDataFram return self count_a = self.count_features() - sdf = self.get_sdf() + sdf = self.get_df() if keep: sdf = sdf.select( @@ -333,23 +334,14 @@ def scale_features(self, scaler_method: str = 'standard', **kwargs) -> 'FSDataFr else: raise ValueError("`scaler_method` must be one of: min_max, max_abs, standard or robust.") - features_col_vector = '_features' - scaled_features_vector = '_features_scaled' + feature_array = self._features_to_array() - sdf = self.get_sdf_vector(output_column_vector=features_col_vector) + feature_array = (scaler + .fit(feature_array) + .transform() + ) - sdf = (scaler - .setInputCol(features_col_vector) - .setOutputCol(scaled_features_vector) - .fit(sdf) - .transform(sdf) - .drop(features_col_vector) - ) - - sdf = _disassemble_column_vector(sdf, - features_cols=self.get_features_names(), - col_vector_name=scaled_features_vector, - drop_col_vector=True) + df_scaled = self._array_to_features(feature_array) return self.update(sdf, self.__sample_col, @@ -399,9 +391,6 @@ def split_df(self, # Return the updated DataFrames return self.update(train_df), self.update(test_df) - - - @classmethod def update(cls, df: DataFrame, @@ -420,51 +409,72 @@ def update(cls, """ return cls(df, sample_col, label_col, row_index_col) -def _assemble_column_vector(self, - input_feature_cols: List[str], - output_column_vector: str = 'features', - drop_input_cols: bool = True) -> pd.DataFrame: - """ - Assemble features (columns) from DataFrame into a column of type Numpy array. - - :param drop_input_cols: Boolean flag to drop the input feature columns. - :param input_feature_cols: List of feature column names. - :param output_column_vector: Name of the output column that will contain the combined vector. - :param sdf: Pandas DataFrame - - :return: DataFrame with column of type Numpy array. - """ - - # Combine the input columns into a single vector (Numpy array) - self.__df[output_column_vector] = self.__df[input_feature_cols].apply(lambda row: np.array(row), axis=1) - - # Drop input columns if flag is set to True - if drop_input_cols: - return self.__df.drop(columns=input_feature_cols) - else: - return self.__df - -def _disassemble_column_vector(self, - features_cols: List[str], - col_vector_name: str, - drop_col_vector: bool = True) -> pd.DataFrame: - """ - Convert a column of Numpy arrays in DataFrame to individual columns (a.k.a features). - This is the reverse operation of `_assemble_column_vector`. - - :param features_cols: List of new feature column names. - :param col_vector_name: Name of the column that contains the vector (Numpy array). - :param drop_col_vector: Boolean flag to drop the original vector column. - :return: DataFrame with individual feature columns. - """ - - # Unpack the vector (Numpy array) into individual columns - for i, feature in enumerate(features_cols): - self.__df[feature] = self.__df[col_vector_name].apply(lambda x: x[i]) - - # Drop the original vector column if needed - if drop_col_vector: - self.__df = self.__df.drop(columns=[col_vector_name]) + def _features_to_array(self) -> np.array: + """ + Collect features from FSDataFrame as an array. + `Warning`: This method will collect the entire DataFrame into the driver. + Uses this method on small datasets only (e.g., after filtering or splitting the data) - return self.__df + :return: Numpy array + """ + sdf = self.get_df().select(*self.get_features_names()) + a = np.array(sdf.collect()) + return a + def _array_to_features(self, a: np.array) -> pd.DataFrame: + """ + Convert a Numpy array to a DataFrame with features as columns. + :param a: Numpy array + :return: Pandas DataFrame + """ + return pd.DataFrame(a, columns=self.get_features_names()) + +# +# def _assemble_column_vector(self, +# input_feature_cols: List[str], +# output_column_vector: str = 'features', +# drop_input_cols: bool = True) -> pd.DataFrame: +# """ +# Assemble features (columns) from DataFrame into a column of type Numpy array. +# +# :param drop_input_cols: Boolean flag to drop the input feature columns. +# :param input_feature_cols: List of feature column names. +# :param output_column_vector: Name of the output column that will contain the combined vector. +# :param sdf: Pandas DataFrame +# +# :return: DataFrame with column of type Numpy array. +# """ +# +# # Combine the input columns into a single vector (Numpy array) +# self.__df[output_column_vector] = self.__df[input_feature_cols].apply(lambda row: np.array(row), axis=1) +# +# # Drop input columns if flag is set to True +# if drop_input_cols: +# return self.__df.drop(columns=input_feature_cols) +# else: +# return self.__df +# +# +# def _disassemble_column_vector(self, +# features_cols: List[str], +# col_vector_name: str, +# drop_col_vector: bool = True) -> pd.DataFrame: +# """ +# Convert a column of Numpy arrays in DataFrame to individual columns (a.k.a features). +# This is the reverse operation of `_assemble_column_vector`. +# +# :param features_cols: List of new feature column names. +# :param col_vector_name: Name of the column that contains the vector (Numpy array). +# :param drop_col_vector: Boolean flag to drop the original vector column. +# :return: DataFrame with individual feature columns. +# """ +# +# # Unpack the vector (Numpy array) into individual columns +# for i, feature in enumerate(features_cols): +# self.__df[feature] = self.__df[col_vector_name].apply(lambda x: x[i]) +# +# # Drop the original vector column if needed +# if drop_col_vector: +# self.__df = self.__df.drop(columns=[col_vector_name]) +# +# return self.__df diff --git a/fsspark/fs/univariate.py b/fsspark/fs/univariate.py index 1103762..713cd26 100644 --- a/fsspark/fs/univariate.py +++ b/fsspark/fs/univariate.py @@ -1,133 +1,110 @@ import logging from typing import Dict, List -import pyspark.sql.functions as f -from pyspark.ml.feature import UnivariateFeatureSelector - -from fsspark.fs.constants import ANOVA, UNIVARIATE_CORRELATION, F_REGRESSION, UNIVARIATE_METHODS - -from fsspark.fs.core import FSDataFrame -from fsspark.utils.generic import tag +import pandas as pd +from sklearn.feature_selection import SelectKBest, f_classif, f_regression +from scipy.stats import pearsonr logging.basicConfig(format="%(levelname)s (%(name)s %(lineno)s): %(message)s") -logger = logging.getLogger("FSSPARK:UNIVARIATE") +logger = logging.getLogger("FS:UNIVARIATE") logger.setLevel(logging.INFO) -@tag("spark implementation") -def compute_univariate_corr(fsdf: FSDataFrame) -> Dict[str, float]: +def compute_univariate_corr(df: pd.DataFrame, features: List[str], label: str) -> Dict[str, float]: """ Compute the correlation coefficient between every column (features) in the input DataFrame and the label (class). - :param fsdf: Input FSDataFrame + :param df: Input DataFrame + :param features: List of feature column names + :param label: Label column name :return: Return dict {feature -> corr} """ - - sdf = fsdf.get_sdf() - features = fsdf.get_features_names() - label = fsdf.get_label_col_name() - - u_corr = sdf.select([f.abs(f.corr(sdf[c], sdf[label])).alias(c) for c in features]) - - return u_corr.first().asDict() + correlations = {feature: abs(df[feature].corr(df[label])) for feature in features} + return correlations -def univariate_correlation_selector(fsdf: FSDataFrame, - corr_threshold: float = 0.3) -> List[str]: +def univariate_correlation_selector(df: pd.DataFrame, features: List[str], label: str, corr_threshold: float = 0.3) -> \ +List[str]: """ - Select features based on its correlation with a label (class), if corr value is less than a specified threshold. - Expected both features and label to be of type numeric. + Select features based on their correlation with a label (class), if the correlation value is less than the specified threshold. - :param fsdf: FSDataFrame - :param corr_threshold: Maximal correlation threshold allowed between feature and label. + :param df: Input DataFrame + :param features: List of feature column names + :param label: Label column name + :param corr_threshold: Maximum allowed correlation threshold - :return: List of selected features names + :return: List of selected feature names """ - d = compute_univariate_corr(fsdf) - selected_features = [k for k in d.keys() if d.get(k) <= corr_threshold] - + correlations = compute_univariate_corr(df, features, label) + selected_features = [feature for feature, corr in correlations.items() if corr <= corr_threshold] return selected_features -@tag("spark implementation") -def univariate_selector(fsdf: FSDataFrame, - label_type: str = 'categorical', - selection_mode: str = 'percentile', - selection_threshold: float = 0.8, - **kwargs) -> List[str]: +def univariate_selector(df: pd.DataFrame, features: List[str], label: str, label_type: str = 'categorical', + selection_mode: str = 'percentile', selection_threshold: float = 0.8) -> List[str]: """ - Wrapper for `UnivariateFeatureSelector`. - See https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.UnivariateFeatureSelector.html - - Only continues features are supported. If label is categorical, ANOVA test is used. If label is of type continues - then an F-regression test is used. + Wrapper for scikit-learn's `SelectKBest` feature selector. + If the label is categorical, ANOVA test is used; if continuous, F-regression test is used. - :param fsdf: Input FSDataFrame - :param label_type: Type of label. Possible values are 'categorical' or 'continuous'. - :param selection_mode: Mode for feature selection. Possible values are 'numTopFeatures' or 'percentile'. - :param selection_threshold: Number of features to select or the percentage of features to select. + :param df: Input DataFrame + :param features: List of feature column names + :param label: Label column name + :param label_type: Type of label ('categorical' or 'continuous') + :param selection_mode: Mode for feature selection ('percentile' or 'k_best') + :param selection_threshold: Number of features to select or the percentage of features - :return: List of selected features names + :return: List of selected feature names """ - vector_col_name = 'features' - sdf = fsdf.get_sdf_vector(output_column_vector=vector_col_name) - label = fsdf.get_label_col_name() + X = df[features].values + y = df[label].values - # set selector if label_type == 'categorical': - # TODO: print msg to logger with the method being used here... - print("ANOVA (F-classification) univariate feature selection") + logger.info("ANOVA (F-classification) univariate feature selection") + selector = SelectKBest(score_func=f_classif) elif label_type == 'continuous': - # TODO: print msg to logger with the method being used here... - print("F-value (F-regression) univariate feature selection") + logger.info("F-value (F-regression) univariate feature selection") + selector = SelectKBest(score_func=f_regression) + else: + raise ValueError("`label_type` must be one of 'categorical' or 'continuous'") + + if selection_mode == 'percentile': + selector.set_params(k='all') # We'll handle the percentile threshold manually + selector.fit(X, y) + scores = selector.scores_ + selected_indices = [i for i, score in enumerate(scores) if score >= selection_threshold * max(scores)] else: - raise ValueError("`label_type` must be one of categorical or continuous") - - selector = UnivariateFeatureSelector(**kwargs) - (selector - .setLabelType(label_type) - .setFeaturesCol(vector_col_name) - .setFeatureType("continuous") - .setOutputCol("selectedFeatures") - .setLabelCol(label) - .setSelectionMode(selection_mode) - .setSelectionThreshold(selection_threshold) - ) - - model = selector.fit(sdf) - selected_features_indices = model.selectedFeatures - selected_features = fsdf.get_features_by_index(selected_features_indices) + selector.set_params(k=int(selection_threshold)) + selector.fit(X, y) + selected_indices = selector.get_support(indices=True) + selected_features = [features[i] for i in selected_indices] return selected_features -@tag("spark implementation") -def univariate_filter(fsdf: FSDataFrame, - univariate_method: str = 'u_corr', - **kwargs) -> FSDataFrame: +def univariate_filter(df: pd.DataFrame, features: List[str], label: str, univariate_method: str = 'u_corr', + **kwargs) -> pd.DataFrame: """ Filter features after applying a univariate feature selector method. - :param fsdf: Input FSDataFrame - :param univariate_method: Univariate selector method. - Possible values are 'u_corr', 'anova' (categorical label) - or 'f_regression' (continuous label). + :param df: Input DataFrame + :param features: List of feature column names + :param label: Label column name + :param univariate_method: Univariate selector method ('u_corr', 'anova', 'f_regression') - :return: Filtered FSDataFrame + :return: Filtered DataFrame with selected features """ - if univariate_method == ANOVA: - selected_features = univariate_selector(fsdf, label_type='categorical', **kwargs) - elif univariate_method == F_REGRESSION: - selected_features = univariate_selector(fsdf, label_type='continuous', **kwargs) - elif univariate_method == UNIVARIATE_CORRELATION: - selected_features = univariate_correlation_selector(fsdf, **kwargs) + if univariate_method == 'anova': + selected_features = univariate_selector(df, features, label, label_type='categorical', **kwargs) + elif univariate_method == 'f_regression': + selected_features = univariate_selector(df, features, label, label_type='continuous', **kwargs) + elif univariate_method == 'u_corr': + selected_features = univariate_correlation_selector(df, features, label, **kwargs) else: - raise ValueError(f"Univariate method {univariate_method} not supported. " - f"Expected one of {UNIVARIATE_METHODS.keys()}") + raise ValueError(f"Univariate method {univariate_method} not supported.") - logger.info(f"Applying univariate filter {univariate_method}.") + logger.info(f"Applying univariate filter using method: {univariate_method}") - return fsdf.filter_features(selected_features, keep=True) + return df[selected_features + [label]] # Return DataFrame with selected features and label column