diff --git a/fslite/fs/constants.py b/fslite/fs/constants.py index 8937465..87ab22b 100644 --- a/fslite/fs/constants.py +++ b/fslite/fs/constants.py @@ -1,72 +1,52 @@ """ This file contains a list of constants used in the feature selection and machine learning methods. """ + from typing import Dict, List, Union FS_METHODS = { - 'univariate': { - "title": 'Univariate Feature Selection', + "univariate": { + "title": "Univariate Feature Selection", "methods": [ { - 'name': 'anova', - 'description': 'Univariate ANOVA feature selection (f-classification)' - }, - { - 'name': 'u_corr', - 'description': 'Univariate correlation' + "name": "anova", + "description": "Univariate ANOVA feature selection (f-classification)", }, - { - 'name': 'f_regression', - 'description': 'Univariate f-regression' - } - ] + {"name": "u_corr", "description": "Univariate correlation"}, + {"name": "f_regression", "description": "Univariate f-regression"}, + ], }, - 'multivariate': { - "title": 'Multivariate Feature Selection', + "multivariate": { + "title": "Multivariate Feature Selection", "methods": [ - { - 'name': 'm_corr', - 'description': 'Multivariate Correlation' - }, - { - 'name': 'variance', - 'description': 'Multivariate Variance' - } - ] + {"name": "m_corr", "description": "Multivariate Correlation"}, + {"name": "variance", "description": "Multivariate Variance"}, + ], }, - 'ml': { - "title": 'Machine Learning Wrapper', + "ml": { + "title": "Machine Learning Wrapper", "methods": [ + {"name": "rf_binary", "description": "Random Forest Binary Classifier"}, + {"name": "lsvc_binary", "description": "Linear SVC Binary Classifier"}, { - 'name': 'rf_binary', - 'description': 'Random Forest Binary Classifier' - }, - { - 'name': 'lsvc_binary', - 'description': 'Linear SVC Binary Classifier' + "name": "fm_binary", + "description": "Factorization Machine Binary Classifier", }, { - 'name': 'fm_binary', - 'description': 'Factorization Machine Binary Classifier' + "name": "rf_multilabel", + "description": "Random Forest Multi-label Classifier", }, { - 'name': 'rf_multilabel', - 'description': 'Random Forest Multi-label Classifier' + "name": "lg_multilabel", + "description": "Logistic Regression Multi-label Classifier", }, + {"name": "rf_regression", "description": "Random Forest Regression"}, { - 'name': 'lg_multilabel', - 'description': 'Logistic Regression Multi-label Classifier' + "name": "fm_regression", + "description": "Factorization Machine Regression", }, - { - 'name': 'rf_regression', - 'description': 'Random Forest Regression' - }, - { - 'name': 'fm_regression', - 'description': 'Factorization Machine Regression' - } - ] - } + ], + }, } @@ -77,6 +57,7 @@ def get_fs_methods(): """ return FS_METHODS + def get_fs_method_details(method_name: str) -> Union[Dict, None]: """ Get the details of the feature selection method, this function search in all-methods definitions @@ -87,19 +68,19 @@ def get_fs_method_details(method_name: str) -> Union[Dict, None]: """ for method_type in FS_METHODS: - for method in FS_METHODS[method_type]['methods']: - if method['name'].lower() == method_name.lower(): + for method in FS_METHODS[method_type]["methods"]: + if method["name"].lower() == method_name.lower(): return method return None + def get_fs_univariate_methods() -> List: """ Get the list of univariate methods implemented in the library :return: list """ - univariate_methods = FS_METHODS['univariate'] - univariate_names = [method["name"] for method in univariate_methods["methods"]] - return univariate_names + return get_fs_method_by_class["univariate"] + def is_valid_univariate_method(method_name: str) -> bool: """ @@ -113,3 +94,12 @@ def is_valid_univariate_method(method_name: str) -> bool: return False +def get_fs_method_by_class(fs_class: str) -> List: + """ + Get the FS method supported for a given FS class, for example, univariate + :param fs_class + :return FS List + """ + fs_methods = FS_METHODS[fs_class] + fs_names = [method["name"] for method in fs_methods["methods"]] + return fs_names diff --git a/fslite/fs/fdataframe.py b/fslite/fs/fdataframe.py index 8748c05..3553014 100644 --- a/fslite/fs/fdataframe.py +++ b/fslite/fs/fdataframe.py @@ -7,7 +7,13 @@ import psutil from pandas import DataFrame from scipy import sparse -from sklearn.preprocessing import MinMaxScaler, MaxAbsScaler, StandardScaler, RobustScaler, LabelEncoder +from sklearn.preprocessing import ( + MinMaxScaler, + MaxAbsScaler, + StandardScaler, + RobustScaler, + LabelEncoder, +) logging.basicConfig(format="%(levelname)s (%(name)s %(lineno)s): %(message)s") logger = logging.getLogger("pickfeat") @@ -30,13 +36,16 @@ class FSDataFrame: [...] """ + def __init__( - self, - df: pd.DataFrame, - sample_col: Optional[str] = None, - label_col: Optional[str] = None, - sparse_threshold: float = 0.7, # Threshold for sparsity - memory_threshold: Optional[float] = 0.75 # Proportion of system memory to use for dense arrays + self, + df: pd.DataFrame, + sample_col: Optional[str] = None, + label_col: Optional[str] = None, + sparse_threshold: float = 0.7, # Threshold for sparsity + memory_threshold: Optional[ + float + ] = 0.75, # Proportion of system memory to use for dense arrays ): """ Create an instance of FSDataFrame. @@ -60,7 +69,9 @@ def __init__( # Handle sample column if sample_col: if sample_col not in df.columns: - raise ValueError(f"Sample column '{sample_col}' not found in DataFrame.") + raise ValueError( + f"Sample column '{sample_col}' not found in DataFrame." + ) self.__sample_col = sample_col self.__samples = df[sample_col].tolist() columns_to_drop.append(sample_col) @@ -105,19 +116,27 @@ def __init__( if sparsity > sparse_threshold: if dense_matrix_size < memory_threshold * available_memory: # Use dense matrix if enough memory is available - logging.info(f"Data is sparse (sparsity={sparsity:.2f}) but enough memory available. " - f"Using a dense matrix.") + logging.info( + f"Data is sparse (sparsity={sparsity:.2f}) but enough memory available. " + f"Using a dense matrix." + ) self.__matrix = numerical_df.to_numpy(dtype=np.float32) self.__is_sparse = False else: # Use sparse matrix due to memory constraints - logging.info(f"Data is sparse (sparsity={sparsity:.2f}), memory insufficient for dense matrix. " - f"Using a sparse matrix representation.") - self.__matrix = sparse.csr_matrix(numerical_df.to_numpy(dtype=np.float32)) + logging.info( + f"Data is sparse (sparsity={sparsity:.2f}), memory insufficient for dense matrix. " + f"Using a sparse matrix representation." + ) + self.__matrix = sparse.csr_matrix( + numerical_df.to_numpy(dtype=np.float32) + ) self.__is_sparse = True else: # Use dense matrix since it's not sparse - logging.info(f"Data is not sparse (sparsity={sparsity:.2f}), using a dense matrix.") + logging.info( + f"Data is not sparse (sparsity={sparsity:.2f}), using a dense matrix." + ) self.__matrix = numerical_df.to_numpy(dtype=np.float32) self.__is_sparse = False @@ -159,7 +178,7 @@ def count_instances(self) -> int: """ return self.__matrix.shape[0] - def scale_features(self, scaler_method: str = 'standard', **kwargs) -> bool: + def scale_features(self, scaler_method: str = "standard", **kwargs) -> bool: """ Scales features in the SDataFrame using a specified method. @@ -167,16 +186,18 @@ def scale_features(self, scaler_method: str = 'standard', **kwargs) -> bool: :return: FSDataFrame with scaled features. """ - if scaler_method == 'min_max': + if scaler_method == "min_max": scaler = MinMaxScaler(**kwargs) - elif scaler_method == 'max_abs': + elif scaler_method == "max_abs": scaler = MaxAbsScaler(**kwargs) - elif scaler_method == 'standard': + elif scaler_method == "standard": scaler = StandardScaler(**kwargs) - elif scaler_method == 'robust': + elif scaler_method == "robust": scaler = RobustScaler(**kwargs) else: - raise ValueError("`scaler_method` must be one of: min_max, max_abs, standard or robust.") + raise ValueError( + "`scaler_method` must be one of: min_max, max_abs, standard or robust." + ) # TODO: Scale only the features for now, we have to investigate if we scale categorical variables self.__matrix = scaler.fit_transform(self.__matrix) @@ -192,7 +213,7 @@ def get_scaled_method(self): def is_sparse(self): return self.__is_sparse - def select_features_by_index(self, feature_indexes: List[int]) -> 'FSDataFrame': + def select_features_by_index(self, feature_indexes: List[int]) -> "FSDataFrame": """ Keep only the specified features (by index) and return an updated instance of FSDataFrame. @@ -216,7 +237,9 @@ def select_features_by_index(self, feature_indexes: List[int]) -> 'FSDataFrame': updated_df[self.__label_col] = self.__labels # Return a new instance of FSDataFrame with the updated data - return FSDataFrame(updated_df, sample_col=self.__sample_col, label_col=self.__label_col) + return FSDataFrame( + updated_df, sample_col=self.__sample_col, label_col=self.__label_col + ) def to_pandas(self) -> DataFrame: """ @@ -241,9 +264,9 @@ def to_pandas(self) -> DataFrame: return df - def split_df(self, - label_type_cat: bool = True, - split_training_factor: float = 0.7) -> Tuple['FSDataFrame', 'FSDataFrame']: + def split_df( + self, label_type_cat: bool = True, split_training_factor: float = 0.7 + ) -> Tuple["FSDataFrame", "FSDataFrame"]: """ Split DataFrame into training and test dataset. It will generate a nearly class-balanced training @@ -284,4 +307,3 @@ def split_df(self, # # # Return the updated DataFrames # return self.update(train_df), self.update(test_df) - diff --git a/fslite/fs/methods.py b/fslite/fs/methods.py index 4ceeb16..7fbb6c6 100644 --- a/fslite/fs/methods.py +++ b/fslite/fs/methods.py @@ -1,8 +1,7 @@ from abc import ABC, abstractmethod from typing import List, Type, Union, Tuple, Optional, Dict, Any -from fslite.fs.constants import (ML_METHODS, UNIVARIATE_METHODS, - MULTIVARIATE_METHODS) +from fslite.fs.constants import ML_METHODS, UNIVARIATE_METHODS, MULTIVARIATE_METHODS from fslite.fs.core import FSDataFrame from fslite.fs.ml import MLCVModel from fslite.fs.multivariate import multivariate_filter @@ -16,12 +15,10 @@ class FSMethod(ABC): valid_methods: Tuple[str] - def __init__(self, - fs_method, - **kwargs): + def __init__(self, fs_method, **kwargs): """ Initialize the feature selection method with the specified parameters. - + :param fs_method: The feature selection method to be used. :param kwargs: Additional keyword arguments for the feature selection method. """ @@ -140,9 +137,7 @@ def select_features(self, fsdf) -> FSDataFrame: The selected features. """ - return univariate_filter( - fsdf, univariate_method=self.fs_method, **self.kwargs - ) + return univariate_filter(fsdf, univariate_method=self.fs_method, **self.kwargs) def __str__(self): return f"FSUnivariate(method={self.fs_method}, kwargs={self.kwargs})" @@ -233,12 +228,14 @@ class FSMLMethod(FSMethod): valid_methods = list(ML_METHODS.keys()) _ml_model: MLCVModel = None - def __init__(self, - fs_method: str, - rfe: bool = False, - rfe_iterations: int = 3, - percent_to_keep: float = 0.90, - **kwargs): + def __init__( + self, + fs_method: str, + rfe: bool = False, + rfe_iterations: int = 3, + percent_to_keep: float = 0.90, + **kwargs, + ): """ Initialize the machine learning feature selection method with the specified parameters. @@ -251,10 +248,14 @@ def __init__(self, self.validate_method(fs_method) # set the estimator, grid and cv parameters (or none if not provided) - self.estimator_params = kwargs.get('estimator_params', None) # estimator parameters - self.evaluator_params = kwargs.get('evaluator_params', None) # evaluator parameters - self.grid_params = kwargs.get('grid_params', None) # grid parameters - self.cv_params = kwargs.get('cv_params', None) # cross-validation parameters + self.estimator_params = kwargs.get( + "estimator_params", None + ) # estimator parameters + self.evaluator_params = kwargs.get( + "evaluator_params", None + ) # evaluator parameters + self.grid_params = kwargs.get("grid_params", None) # grid parameters + self.cv_params = kwargs.get("cv_params", None) # cross-validation parameters # set the machine learning model self._ml_model = self._set_ml_model() @@ -265,7 +266,9 @@ def __init__(self, self.rfe_iterations = rfe_iterations # performance metrics - self.rfe_training_metric: list = [] # performance metrics on training for each rfe iteration + self.rfe_training_metric: list = ( + [] + ) # performance metrics on training for each rfe iteration self.training_metric = None # performance metrics on training (final model) self.testing_metric = None # performance metrics on testing (final model) @@ -310,7 +313,7 @@ def _set_ml_model(self): estimator_params=self.estimator_params, evaluator_params=self.evaluator_params, grid_params=self.grid_params, - cv_params=self.cv_params + cv_params=self.cv_params, ) return self._ml_model @@ -326,8 +329,8 @@ def _fit_and_filter(self, df: FSDataFrame) -> FSDataFrame: # get feature based on the (percentile) threshold provided # expected a dataframe sorted by scores in descending order selected_features = feature_scores.iloc[ - :int(self.percent_to_keep * len(feature_scores)) - ]['feature_index'] + : int(self.percent_to_keep * len(feature_scores)) + ]["feature_index"] return df.filter_features_by_index(selected_features, keep=True) @@ -343,17 +346,23 @@ def select_features(self, fsdf: FSDataFrame) -> FSDataFrame: """ if fsdf is None or fsdf.count_features() == 0 or fsdf.count_instances() == 0: - raise ValueError("The data frame is empty or does not contain any features.") + raise ValueError( + "The data frame is empty or does not contain any features." + ) fsdf = self._fit_and_filter(fsdf) # Recursive feature elimination if self.rfe: for iteration in range(self.rfe_iterations): - print(f"RFE: running {iteration + 1} of {self.rfe_iterations} iterations...") + print( + f"RFE: running {iteration + 1} of {self.rfe_iterations} iterations..." + ) fsdf = self._fit_and_filter(fsdf) # collect the performance metrics on training for every rfe iteration - self.rfe_training_metric.append(self._ml_model.get_eval_metric_on_training()) + self.rfe_training_metric.append( + self._ml_model.get_eval_metric_on_training() + ) # get the final performance metric on training self.training_metric = self._ml_model.get_eval_metric_on_training() @@ -384,7 +393,9 @@ def get_eval_metric_on_training_rfe(self): The evaluation metric on the training data for each RFE iteration. """ if self.rfe_training_metric is None: - raise ValueError("No training metric is available. Run the select_features method first.") + raise ValueError( + "No training metric is available. Run the select_features method first." + ) return self.rfe_training_metric def get_eval_metric_on_training(self): @@ -395,7 +406,9 @@ def get_eval_metric_on_training(self): The evaluation metric on the training data. """ if self.training_metric is None: - raise ValueError("No training metric is available. Run the select_features method first.") + raise ValueError( + "No training metric is available. Run the select_features method first." + ) return self.training_metric def get_eval_metric_on_testing(self, fsdf: FSDataFrame): @@ -410,7 +423,9 @@ def get_eval_metric_on_testing(self, fsdf: FSDataFrame): """ if fsdf is None or fsdf.count_features() == 0 or fsdf.count_instances() == 0: - raise ValueError("The testing data frame is empty or does not contain any features.") + raise ValueError( + "The testing data frame is empty or does not contain any features." + ) # evaluate the model on the testing data eval_metric = self._ml_model.get_eval_metric_on_testing(fsdf) @@ -427,7 +442,9 @@ def get_feature_scores(self): """ if self.feature_scores is None: - raise ValueError("Feature scores are not available. Run the feature selection method first.") + raise ValueError( + "Feature scores are not available. Run the feature selection method first." + ) return self.feature_scores @@ -452,14 +469,18 @@ class FSPipeline: selected_features = fs_pipeline.select_features(fsdf) """ - _valid_methods: List[Type[Union[FSUnivariate, FSMultivariate, FSMLMethod]]] = [FSUnivariate, - FSMultivariate, - FSMLMethod] + _valid_methods: List[Type[Union[FSUnivariate, FSMultivariate, FSMLMethod]]] = [ + FSUnivariate, + FSMultivariate, + FSMLMethod, + ] - def __init__(self, - df_training: FSDataFrame, - df_testing: Optional[FSDataFrame], - fs_stages: List[Union[FSUnivariate, FSMultivariate, FSMLMethod]]): + def __init__( + self, + df_training: FSDataFrame, + df_testing: Optional[FSDataFrame], + fs_stages: List[Union[FSUnivariate, FSMultivariate, FSMLMethod]], + ): """ Initialize the feature selection pipeline with the specified feature selection methods. @@ -482,15 +503,23 @@ def validate_methods(self): """ # check if the pipeline contains at least one feature selection method if len(self.fs_stages) == 0: - raise ValueError("The pipeline must contain at least one feature selection method.") + raise ValueError( + "The pipeline must contain at least one feature selection method." + ) # check if the feature selection methods are valid - if not all(isinstance(method, tuple(self._valid_methods)) for method in self.fs_stages): - raise InvalidMethodError(f"Invalid feature selection method. " - f"Accepted methods are {', '.join([str(m) for m in self._valid_methods])}") + if not all( + isinstance(method, tuple(self._valid_methods)) for method in self.fs_stages + ): + raise InvalidMethodError( + f"Invalid feature selection method. " + f"Accepted methods are {', '.join([str(m) for m in self._valid_methods])}" + ) # check if only one ML method is used in the pipeline - ml_methods = [method for method in self.fs_stages if isinstance(method, FSMLMethod)] + ml_methods = [ + method for method in self.fs_stages if isinstance(method, FSMLMethod) + ] if len(ml_methods) > 1: raise ValueError("Only one ML method is allowed in the pipeline.") @@ -509,7 +538,9 @@ def run(self) -> Dict[str, Any]: self.pipeline_results.update(n_stages=n_stages) for i, method in enumerate(self.fs_stages): - print(f"Running stage {i + 1} of {n_stages} of the feature selection pipeline: {method}") + print( + f"Running stage {i + 1} of {n_stages} of the feature selection pipeline: {method}" + ) if isinstance(method, FSMLMethod): fsdf_tmp = method.select_features(fsdf_tmp) @@ -518,8 +549,12 @@ def run(self) -> Dict[str, Any]: self.pipeline_results.update(rfe_iterations=method.rfe_iterations) self.pipeline_results.update(feature_scores=method.get_feature_scores()) self.pipeline_results.update(eval_metric=method.get_eval_metric_name()) - self.pipeline_results.update(rfe_training_metric=method.get_eval_metric_on_training_rfe()) - self.pipeline_results.update(training_metric=method.get_eval_metric_on_training()) + self.pipeline_results.update( + rfe_training_metric=method.get_eval_metric_on_training_rfe() + ) + self.pipeline_results.update( + training_metric=method.get_eval_metric_on_training() + ) if self.df_testing is not None: @@ -530,7 +565,9 @@ def run(self) -> Dict[str, Any]: else: fsdf_tmp = method.select_features(fsdf_tmp) - self.pipeline_results.update(n_initial_features=self.df_training.count_features()) + self.pipeline_results.update( + n_initial_features=self.df_training.count_features() + ) self.pipeline_results.update(n_selected_features=fsdf_tmp.count_features()) return self.pipeline_results diff --git a/fslite/fs/ml.py b/fslite/fs/ml.py index b6ea39d..bf97fae 100644 --- a/fslite/fs/ml.py +++ b/fslite/fs/ml.py @@ -4,34 +4,57 @@ for feature selection (e.g., rank by feature importance) and prediction. """ + import warnings from typing import List, Any, Dict, Optional, Union import pandas as pd from pyspark.ml import Estimator, Model -from pyspark.ml.classification import (RandomForestClassificationModel, - LinearSVCModel, - RandomForestClassifier, - LinearSVC, LogisticRegression, LogisticRegressionModel) -from pyspark.ml.evaluation import (Evaluator, - BinaryClassificationEvaluator, - MulticlassClassificationEvaluator, - RegressionEvaluator) +from pyspark.ml.classification import ( + RandomForestClassificationModel, + LinearSVCModel, + RandomForestClassifier, + LinearSVC, + LogisticRegression, + LogisticRegressionModel, +) +from pyspark.ml.evaluation import ( + Evaluator, + BinaryClassificationEvaluator, + MulticlassClassificationEvaluator, + RegressionEvaluator, +) from pyspark.ml.regression import RandomForestRegressionModel, RandomForestRegressor -from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, CrossValidatorModel, Param - -from fslite.fs.constants import (RF_BINARY, - LSVC_BINARY, - FM_BINARY, - RF_MULTILABEL, - LR_MULTILABEL, - RF_REGRESSION, - FM_REGRESSION, - ML_METHODS) +from pyspark.ml.tuning import ( + CrossValidator, + ParamGridBuilder, + CrossValidatorModel, + Param, +) + +from fslite.fs.constants import ( + RF_BINARY, + LSVC_BINARY, + FM_BINARY, + RF_MULTILABEL, + LR_MULTILABEL, + RF_REGRESSION, + FM_REGRESSION, + ML_METHODS, +) from fslite.fs.core import FSDataFrame -ESTIMATORS_CLASSES = [RandomForestClassifier, RandomForestRegressionModel, LinearSVC, LogisticRegression] -EVALUATORS_CLASSES = [BinaryClassificationEvaluator, MulticlassClassificationEvaluator, RegressionEvaluator] +ESTIMATORS_CLASSES = [ + RandomForestClassifier, + RandomForestRegressionModel, + LinearSVC, + LogisticRegression, +] +EVALUATORS_CLASSES = [ + BinaryClassificationEvaluator, + MulticlassClassificationEvaluator, + RegressionEvaluator, +] # Define an abstract class that allow to create a factory of models @@ -52,18 +75,24 @@ class MLCVModel: _best_model: Model = None _fsdf: FSDataFrame = None - def __init__(self, - estimator: Union[RandomForestClassifier | - RandomForestRegressionModel | - LinearSVC | - LogisticRegression], - evaluator: Union[BinaryClassificationEvaluator | - MulticlassClassificationEvaluator | - RegressionEvaluator], - estimator_params: Optional[Dict[str, Any]] = None, - evaluator_params: Optional[Dict[str, Any]] = None, - grid_params: Optional[Dict[str, List[Any]]] = None, - cv_params: Optional[Dict[str, Any]] = None): + def __init__( + self, + estimator: Union[ + RandomForestClassifier + | RandomForestRegressionModel + | LinearSVC + | LogisticRegression + ], + evaluator: Union[ + BinaryClassificationEvaluator + | MulticlassClassificationEvaluator + | RegressionEvaluator + ], + estimator_params: Optional[Dict[str, Any]] = None, + evaluator_params: Optional[Dict[str, Any]] = None, + grid_params: Optional[Dict[str, List[Any]]] = None, + cv_params: Optional[Dict[str, Any]] = None, + ): """ Initializes the MLModel with optional estimator, evaluator, and parameter specifications. """ @@ -96,7 +125,9 @@ def _initialize_model(self): # Initialize and set cross-validator parameters self._set_cross_validator() - def _parse_grid_params(self, grid_params: Dict[str, List[Any]]) -> List[Dict[Param, Any]]: + def _parse_grid_params( + self, grid_params: Dict[str, List[Any]] + ) -> List[Dict[Param, Any]]: """ Parse the grid parameters to create a list of dictionaries. @@ -108,10 +139,12 @@ def _parse_grid_params(self, grid_params: Dict[str, List[Any]]) -> List[Dict[Par if hasattr(self.estimator, param): grid = grid.addGrid(getattr(self.estimator, param), values) else: - raise AttributeError(f"{self.estimator.__class__.__name__} does not have attribute {param}") + raise AttributeError( + f"{self.estimator.__class__.__name__} does not have attribute {param}" + ) return grid.build() - def _validate_estimator(self, estimator: Estimator) -> 'MLCVModel': + def _validate_estimator(self, estimator: Estimator) -> "MLCVModel": """ Validate the estimator. @@ -123,7 +156,7 @@ def _validate_estimator(self, estimator: Estimator) -> 'MLCVModel': raise ValueError(f"Estimator must be an instance of {ESTIMATORS_CLASSES}") return self - def _validate_evaluator(self, evaluator: Evaluator) -> 'MLCVModel': + def _validate_evaluator(self, evaluator: Evaluator) -> "MLCVModel": """ Validate the evaluator. @@ -145,7 +178,9 @@ def _validate_estimator_params(self, estimator_params: Dict[str, Any]) -> None: return for param, _ in estimator_params.items(): if not self.estimator.hasParam(param): - raise AttributeError(f"{self.estimator.__class__.__name__} does not have attribute {param}") + raise AttributeError( + f"{self.estimator.__class__.__name__} does not have attribute {param}" + ) def _validate_evaluator_params(self, evaluator_params: Dict[str, Any]) -> None: """ @@ -157,9 +192,11 @@ def _validate_evaluator_params(self, evaluator_params: Dict[str, Any]) -> None: return for param, _ in evaluator_params.items(): if not self.evaluator.hasParam(param): - raise AttributeError(f"{self.evaluator.__class__.__name__} does not have attribute {param}") + raise AttributeError( + f"{self.evaluator.__class__.__name__} does not have attribute {param}" + ) - def _set_evaluator_params(self) -> 'MLCVModel': + def _set_evaluator_params(self) -> "MLCVModel": """ Set evaluator parameters. """ @@ -167,7 +204,7 @@ def _set_evaluator_params(self) -> 'MLCVModel': self.evaluator = self.evaluator.setParams(**self.evaluator_params) return self - def _set_estimator_params(self) -> 'MLCVModel': + def _set_estimator_params(self) -> "MLCVModel": """ Set estimator parameters. """ @@ -175,7 +212,7 @@ def _set_estimator_params(self) -> 'MLCVModel': self.estimator = self.estimator.setParams(**self.estimator_params) return self - def _set_cv_params(self, cv_params: Dict[str, Any]) -> 'MLCVModel': + def _set_cv_params(self, cv_params: Dict[str, Any]) -> "MLCVModel": """ Parse the cross-validator parameters to create an instance of CrossValidator. @@ -187,10 +224,12 @@ def _set_cv_params(self, cv_params: Dict[str, Any]) -> 'MLCVModel': if hasattr(self._cross_validator, param): setattr(self._cross_validator, param, value) else: - raise AttributeError(f"{self._cross_validator.__class__.__name__} does not have attribute {param}") + raise AttributeError( + f"{self._cross_validator.__class__.__name__} does not have attribute {param}" + ) return self - def _set_cross_validator(self) -> 'MLCVModel': + def _set_cross_validator(self) -> "MLCVModel": """ Build the model using the cross-validator. @@ -203,14 +242,16 @@ def _set_cross_validator(self) -> 'MLCVModel': evaluator=self.evaluator, ) if self.cv_params is not None: - self._cross_validator = self._cross_validator.setParams(**self.cv_params) + self._cross_validator = self._cross_validator.setParams( + **self.cv_params + ) return self except Exception as e: print(f"An error occurred while creating the CrossValidator: {str(e)}") # Handle the exception or raise it to be handled by the caller raise - def fit(self, fsdf: FSDataFrame) -> 'MLCVModel': + def fit(self, fsdf: FSDataFrame) -> "MLCVModel": """ Fit the model using the cross-validator. @@ -219,8 +260,14 @@ def fit(self, fsdf: FSDataFrame) -> 'MLCVModel': # Extract the Spark DataFrame and label column name from FSDataFrame self._fsdf = fsdf - if self._cross_validator is None or self.estimator is None or self.evaluator is None: - raise ValueError("Cross-validator, estimator, or evaluator not set properly.") + if ( + self._cross_validator is None + or self.estimator is None + or self.evaluator is None + ): + raise ValueError( + "Cross-validator, estimator, or evaluator not set properly." + ) self._fitted_cv_model = self._cross_validator.fit(self._fsdf.get_sdf_vector()) return self @@ -232,17 +279,21 @@ def _get_best_model(self) -> Model: :return: The best model. """ if self._fitted_cv_model is None: - raise ValueError("CrossValidatorModel not fitted. Use fit() to fit the model.") + raise ValueError( + "CrossValidatorModel not fitted. Use fit() to fit the model." + ) self._best_model = self._fitted_cv_model.bestModel return self._best_model # define a static method that allows to set a ml model based on the model type @staticmethod - def create_model(model_type: str, - estimator_params: Dict[str, Any] = None, - evaluator_params: Dict[str, Any] = None, - grid_params: Dict[str, List[Any]] = None, - cv_params: Dict[str, Any] = None) -> 'MLCVModel': + def create_model( + model_type: str, + estimator_params: Dict[str, Any] = None, + evaluator_params: Dict[str, Any] = None, + grid_params: Dict[str, List[Any]] = None, + cv_params: Dict[str, Any] = None, + ) -> "MLCVModel": """ Set a machine learning model based on the model type. @@ -270,8 +321,10 @@ def create_model(model_type: str, estimator = RandomForestRegressor() evaluator = RegressionEvaluator() else: - raise ValueError(f"Unsupported model type: {model_type}." - f"Supported model types are: {list(ML_METHODS.keys())}") + raise ValueError( + f"Unsupported model type: {model_type}." + f"Supported model types are: {list(ML_METHODS.keys())}" + ) ml_method = MLCVModel( estimator=estimator, @@ -279,7 +332,7 @@ def create_model(model_type: str, estimator_params=estimator_params, evaluator_params=evaluator_params, grid_params=grid_params, - cv_params=cv_params + cv_params=cv_params, ) return ml_method @@ -301,18 +354,22 @@ def get_feature_scores(self) -> pd.DataFrame: # raise exception if the model is not none if best_model is None: - raise ValueError("No ML model have been fitted. Use fit() to fit the model.") + raise ValueError( + "No ML model have been fitted. Use fit() to fit the model." + ) - df_features = pd.DataFrame(indexed_features.to_numpy(), - columns=["features"]) + df_features = pd.DataFrame(indexed_features.to_numpy(), columns=["features"]) - if isinstance(best_model, (RandomForestClassificationModel, RandomForestRegressionModel)): + if isinstance( + best_model, (RandomForestClassificationModel, RandomForestRegressionModel) + ): df_scores = pd.DataFrame( - data=best_model.featureImportances.toArray(), - columns=["scores"] + data=best_model.featureImportances.toArray(), columns=["scores"] ) - df_scores = df_scores.reset_index(level=0).rename(columns={"index": "feature_index"}) + df_scores = df_scores.reset_index(level=0).rename( + columns={"index": "feature_index"} + ) # merge the feature scores with the feature names df = df_features.merge( @@ -323,14 +380,16 @@ def get_feature_scores(self) -> pd.DataFrame: df = df.sort_values(by="scores", ascending=False) # add feature percentile rank to the features_scores dataframe - df['percentile_rank'] = df['scores'].rank(pct=True) + df["percentile_rank"] = df["scores"].rank(pct=True) return df else: - raise ValueError("Unsupported model type. " - "Only RandomForestClassificationModel, " - "RandomForestRegressionModel, and LinearSVCModel are supported.") + raise ValueError( + "Unsupported model type. " + "Only RandomForestClassificationModel, " + "RandomForestRegressionModel, and LinearSVCModel are supported." + ) def get_eval_metric_on_training(self) -> float: """ @@ -347,7 +406,9 @@ def get_eval_metric_on_training(self) -> float: # get the eval metric name from the evaluator eval_metric_name = self.get_eval_metric_name() - if isinstance(best_model, (RandomForestClassificationModel, LogisticRegressionModel)): + if isinstance( + best_model, (RandomForestClassificationModel, LogisticRegressionModel) + ): metric_value = getattr(best_model.summary, eval_metric_name) elif isinstance(best_model, LinearSVCModel): @@ -378,7 +439,10 @@ def get_eval_metric_on_testing(self, test_data: FSDataFrame) -> float: # predict the test data predictions = None - if isinstance(best_model, (RandomForestClassificationModel, LinearSVCModel, LogisticRegressionModel)): + if isinstance( + best_model, + (RandomForestClassificationModel, LinearSVCModel, LogisticRegressionModel), + ): predictions = best_model.transform(test_data.get_sdf_vector()) metric_value = None diff --git a/fslite/fs/multivariate.py b/fslite/fs/multivariate.py index ef00526..3db3414 100644 --- a/fslite/fs/multivariate.py +++ b/fslite/fs/multivariate.py @@ -3,10 +3,14 @@ import numpy as np import pyspark -from pyspark.ml.feature import (VarianceThresholdSelector) +from pyspark.ml.feature import VarianceThresholdSelector from pyspark.ml.stat import Correlation -from fslite.fs.constants import MULTIVARIATE_METHODS, MULTIVARIATE_CORRELATION, MULTIVARIATE_VARIANCE +from fslite.fs.constants import ( + MULTIVARIATE_METHODS, + MULTIVARIATE_CORRELATION, + MULTIVARIATE_VARIANCE, +) from fslite.fs.core import FSDataFrame from fslite.fs.utils import find_maximal_independent_set @@ -18,9 +22,11 @@ @tag("experimental") -def _compute_correlation_matrix(sdf: pyspark.sql.DataFrame, - features_col: str = 'features', - corr_method: str = "pearson") -> np.ndarray: +def _compute_correlation_matrix( + sdf: pyspark.sql.DataFrame, + features_col: str = "features", + corr_method: str = "pearson", +) -> np.ndarray: """ Compute features Matrix Correlation. @@ -31,23 +37,23 @@ def _compute_correlation_matrix(sdf: pyspark.sql.DataFrame, :return: Numpy array. """ - logger.warning("Warning: Computed matrix correlation will be collected into the drive with this implementation.\n" - "This may cause memory issues. Use it preferably with small datasets.") + logger.warning( + "Warning: Computed matrix correlation will be collected into the drive with this implementation.\n" + "This may cause memory issues. Use it preferably with small datasets." + ) logger.info(f"Computing correlation matrix using {corr_method} method.") - mcorr = (Correlation - .corr(sdf, features_col, corr_method) - .collect()[0][0] - .toArray() - ) + mcorr = Correlation.corr(sdf, features_col, corr_method).collect()[0][0].toArray() return mcorr @tag("experimental") -def multivariate_correlation_selector(fsdf: FSDataFrame, - strict: bool = True, - corr_threshold: float = 0.75, - corr_method: str = "pearson") -> List[str]: +def multivariate_correlation_selector( + fsdf: FSDataFrame, + strict: bool = True, + corr_threshold: float = 0.75, + corr_method: str = "pearson", +) -> List[str]: """ Compute the correlation matrix (Pearson) among input features and select those below a specified threshold. @@ -61,17 +67,21 @@ def multivariate_correlation_selector(fsdf: FSDataFrame, :return: List of selected features names """ - colum_vector_features = 'features' + colum_vector_features = "features" sdf = fsdf.get_sdf_vector(output_column_vector=colum_vector_features) # compute correlation matrix - mcorr = _compute_correlation_matrix(sdf, - features_col=colum_vector_features, - corr_method=corr_method) + mcorr = _compute_correlation_matrix( + sdf, features_col=colum_vector_features, corr_method=corr_method + ) mcorr = np.abs(mcorr) # get absolute correlation value - combs_above_cutoff = np.triu(mcorr, k=1) > corr_threshold # create bool matrix that meet criteria - correlated_col_index = tuple(np.column_stack(np.where(combs_above_cutoff))) # get correlated pairs cols index + combs_above_cutoff = ( + np.triu(mcorr, k=1) > corr_threshold + ) # create bool matrix that meet criteria + correlated_col_index = tuple( + np.column_stack(np.where(combs_above_cutoff)) + ) # get correlated pairs cols index index_to_remove = set() if strict: @@ -94,8 +104,9 @@ def multivariate_correlation_selector(fsdf: FSDataFrame, @tag("spark implementation") -def multivariate_variance_selector(fsdf: FSDataFrame, - variance_threshold: float = 0.0) -> List[str]: +def multivariate_variance_selector( + fsdf: FSDataFrame, variance_threshold: float = 0.0 +) -> List[str]: """ Select features after removing low-variance ones (e.g., features with quasi-constant value across samples). @@ -105,15 +116,15 @@ def multivariate_variance_selector(fsdf: FSDataFrame, :return: List of selected features names """ - colum_vector_features = 'features' + colum_vector_features = "features" sdf = fsdf.get_sdf_vector(output_column_vector=colum_vector_features) selector = VarianceThresholdSelector() - (selector - .setFeaturesCol(colum_vector_features) - .setOutputCol("selectedFeatures") - .setVarianceThreshold(variance_threshold) - ) + ( + selector.setFeaturesCol(colum_vector_features) + .setOutputCol("selectedFeatures") + .setVarianceThreshold(variance_threshold) + ) model = selector.fit(sdf) selected_features_indices = set(model.selectedFeatures) @@ -122,9 +133,9 @@ def multivariate_variance_selector(fsdf: FSDataFrame, return selected_features -def multivariate_filter(fsdf: FSDataFrame, - multivariate_method: str = 'm_corr', - **kwargs) -> FSDataFrame: +def multivariate_filter( + fsdf: FSDataFrame, multivariate_method: str = "m_corr", **kwargs +) -> FSDataFrame: """ Filter features after applying a multivariate feature selector method. @@ -139,8 +150,10 @@ def multivariate_filter(fsdf: FSDataFrame, elif multivariate_method == MULTIVARIATE_VARIANCE: selected_features = multivariate_variance_selector(fsdf, **kwargs) else: - raise ValueError(f"Invalid multivariate method: {multivariate_method}. " - f"Choose one of {MULTIVARIATE_METHODS.keys()}.") + raise ValueError( + f"Invalid multivariate method: {multivariate_method}. " + f"Choose one of {MULTIVARIATE_METHODS.keys()}." + ) logger.info(f"Applying multivariate filter {multivariate_method}.") diff --git a/fslite/fs/univariate.py b/fslite/fs/univariate.py index ddf3ac6..ee53b22 100644 --- a/fslite/fs/univariate.py +++ b/fslite/fs/univariate.py @@ -32,7 +32,9 @@ def compute_univariate_corr(df: FSDataFrame) -> Dict[int, float]: } -def univariate_correlation_selector(df: FSDataFrame, corr_threshold: float = 0.3) -> List[int]: +def univariate_correlation_selector( + df: FSDataFrame, corr_threshold: float = 0.3 +) -> List[int]: """ Select features based on their correlation with a label (class), if the correlation value is less than the specified threshold. @@ -43,12 +45,22 @@ def univariate_correlation_selector(df: FSDataFrame, corr_threshold: float = 0.3 :return: List of selected feature indices """ correlations = compute_univariate_corr(df) - selected_features = [feature_index for feature_index, corr in correlations.items() if corr <= corr_threshold] + selected_features = [ + feature_index + for feature_index, corr in correlations.items() + if corr <= corr_threshold + ] return selected_features -def univariate_selector(df: pd.DataFrame, features: List[str], label: str, label_type: str = 'categorical', - selection_mode: str = 'percentile', selection_threshold: float = 0.8) -> List[str]: +def univariate_selector( + df: pd.DataFrame, + features: List[str], + label: str, + label_type: str = "categorical", + selection_mode: str = "percentile", + selection_threshold: float = 0.8, +) -> List[str]: """ Wrapper for scikit-learn's `SelectKBest` feature selector. If the label is categorical, ANOVA test is used; if continuous, F-regression test is used. @@ -66,20 +78,24 @@ def univariate_selector(df: pd.DataFrame, features: List[str], label: str, label X = df[features].values y = df[label].values - if label_type == 'categorical': + if label_type == "categorical": logger.info("ANOVA (F-classification) univariate feature selection") selector = SelectKBest(score_func=f_classif) - elif label_type == 'continuous': + elif label_type == "continuous": logger.info("F-value (F-regression) univariate feature selection") selector = SelectKBest(score_func=f_regression) else: raise ValueError("`label_type` must be one of 'categorical' or 'continuous'") - if selection_mode == 'percentile': - selector.set_params(k='all') # We'll handle the percentile threshold manually + if selection_mode == "percentile": + selector.set_params(k="all") # We'll handle the percentile threshold manually selector.fit(X, y) scores = selector.scores_ - selected_indices = [i for i, score in enumerate(scores) if score >= selection_threshold * max(scores)] + selected_indices = [ + i + for i, score in enumerate(scores) + if score >= selection_threshold * max(scores) + ] else: selector.set_params(k=int(selection_threshold)) selector.fit(X, y) @@ -89,9 +105,9 @@ def univariate_selector(df: pd.DataFrame, features: List[str], label: str, label return selected_features -def univariate_filter(df: FSDataFrame, - univariate_method: str = 'u_corr', - **kwargs) -> FSDataFrame: +def univariate_filter( + df: FSDataFrame, univariate_method: str = "u_corr", **kwargs +) -> FSDataFrame: """ Filter features after applying a univariate feature selector method. @@ -102,22 +118,24 @@ def univariate_filter(df: FSDataFrame, """ if not is_valid_univariate_method(univariate_method): - raise NotImplementedError("The provided method {} is not implented !! please select one from this list {}".format(univariate_method, get_fs_univariate_methods())) + raise NotImplementedError( + "The provided method {} is not implemented !! please select one from this list {}".format( + univariate_method, get_fs_univariate_methods() + ) + ) selected_features = [] - if univariate_method == 'anova': + if univariate_method == "anova": # TODO: Implement ANOVA selector # selected_features = univariate_selector(df, features, label, label_type='categorical', **kwargs) pass - elif univariate_method == 'f_regression': + elif univariate_method == "f_regression": # TODO: Implement F-regression selector # selected_features = univariate_selector(df, features, label, label_type='continuous', **kwargs) pass - elif univariate_method == 'u_corr': + elif univariate_method == "u_corr": selected_features = univariate_correlation_selector(df, **kwargs) - else: - raise ValueError(f"Univariate method {univariate_method} not supported.") logger.info(f"Applying univariate filter using method: {univariate_method}") diff --git a/fslite/fs/utils.py b/fslite/fs/utils.py index 9fc6a70..80ca059 100644 --- a/fslite/fs/utils.py +++ b/fslite/fs/utils.py @@ -30,7 +30,8 @@ def compute_missingness_rate(fsdf: FSDataFrame) -> Dict[str, float]: missing_rates = sdf.select( [ ( - f.sum(f.when(f.isnan(sdf[c]) | f.isnull(sdf[c]), 1).otherwise(0)) / n_instances + f.sum(f.when(f.isnan(sdf[c]) | f.isnull(sdf[c]), 1).otherwise(0)) + / n_instances ).alias(c) for c in features ] @@ -40,7 +41,7 @@ def compute_missingness_rate(fsdf: FSDataFrame) -> Dict[str, float]: def remove_features_by_missingness_rate( - fsdf: FSDataFrame, threshold: float = 0.15 + fsdf: FSDataFrame, threshold: float = 0.15 ) -> FSDataFrame: """ Remove features from FSDataFrame with missingness rate higher or equal than a specified threshold. @@ -107,7 +108,9 @@ def find_maximal_independent_set(pairs: Tuple[int], keep: bool = True) -> Set[in :return: Set of indices (maximal independent set or remaining indices). """ - logger.warning("This method is experimental and have been not extensively tested...") + logger.warning( + "This method is experimental and have been not extensively tested..." + ) graph = nx.Graph() graph.add_edges_from(pairs) diff --git a/fslite/pipeline/fs_pipeline_example.py b/fslite/pipeline/fs_pipeline_example.py index 3c4c498..32159e9 100644 --- a/fslite/pipeline/fs_pipeline_example.py +++ b/fslite/pipeline/fs_pipeline_example.py @@ -13,13 +13,15 @@ from fslite.utils.io import import_table_as_psdf # Init spark -init_spark(apply_pyarrow_settings=True, - apply_extra_spark_settings=True, - apply_pandas_settings=True) +init_spark( + apply_pyarrow_settings=True, + apply_extra_spark_settings=True, + apply_pandas_settings=True, +) # 1. Import data df = import_table_as_psdf(get_tnbc_data_path(), n_partitions=5) -fsdf = FSDataFrame(df, sample_col='Sample', label_col='label') +fsdf = FSDataFrame(df, sample_col="Sample", label_col="label") # 2. Split data training_data, testing_data = fsdf.split_df(split_training_factor=0.6) @@ -27,33 +29,32 @@ # 3. Set feature selection methods # create a Univariate object univariate = FSUnivariate( - fs_method='anova', - selection_mode='percentile', - selection_threshold=0.8) + fs_method="anova", selection_mode="percentile", selection_threshold=0.8 +) # create a Multivariate object multivariate = FSMultivariate( - fs_method='m_corr', - corr_threshold=0.75, - corr_method="pearson" + fs_method="m_corr", corr_threshold=0.75, corr_method="pearson" ) # create a MLMethod object rf_classifier = FSMLMethod( - fs_method='rf_multilabel', + fs_method="rf_multilabel", rfe=True, rfe_iterations=2, percent_to_keep=0.9, - estimator_params={'labelCol': 'label'}, - evaluator_params={'metricName': 'accuracy'}, - grid_params={'numTrees': [10, 15], 'maxDepth': [5, 10]}, - cv_params={'parallelism': 2, 'numFolds': 5} + estimator_params={"labelCol": "label"}, + evaluator_params={"metricName": "accuracy"}, + grid_params={"numTrees": [10, 15], "maxDepth": [5, 10]}, + cv_params={"parallelism": 2, "numFolds": 5}, ) # 4. Create a pipeline object -fs_pipeline = FSPipeline(df_training=training_data, - df_testing=testing_data, - fs_stages=[univariate, multivariate, rf_classifier]) +fs_pipeline = FSPipeline( + df_training=training_data, + df_testing=testing_data, + fs_stages=[univariate, multivariate, rf_classifier], +) # 5. Run the pipeline results = fs_pipeline.run() @@ -61,7 +62,7 @@ # Print results print(f"Accuracy on training: {results['training_metric']}") print(f"Accuracy on testing: {results['testing_metric']}") -print(results.get('feature_scores')) +print(results.get("feature_scores")) stop_spark_session() diff --git a/fslite/tests/generate_big_tests.py b/fslite/tests/generate_big_tests.py index eb7d677..ccc6f19 100644 --- a/fslite/tests/generate_big_tests.py +++ b/fslite/tests/generate_big_tests.py @@ -5,6 +5,7 @@ import pyarrow as pa import pyarrow.parquet as pq + def test_generate_big_dataset(): # Parameters for the dataset n_samples = 1200 @@ -13,15 +14,19 @@ def test_generate_big_dataset(): # Generate sample IDs and labels sample_ids = np.arange(1, n_samples + 1) - labels = np.random.choice(['LV', 'RV', 'LA', 'RA'], size=n_samples) + labels = np.random.choice(["LV", "RV", "LA", "RA"], size=n_samples) # Parquet schema definition - schema = pa.schema([pa.field('sample_id', pa.int32()), pa.field('label', pa.string())] + - [pa.field(f'feature{i}', pa.float32()) for i in range(1, n_features + 1)]) + schema = pa.schema( + [pa.field("sample_id", pa.int32()), pa.field("label", pa.string())] + + [pa.field(f"feature{i}", pa.float32()) for i in range(1, n_features + 1)] + ) # Create an empty Parquet file - output_file = 'large_dataset_optimized_samples_{}_features_{}.parquet'.format(n_samples, n_features) - with pq.ParquetWriter(output_file, schema, compression='snappy') as writer: + output_file = "large_dataset_optimized_samples_{}_features_{}.parquet".format( + n_samples, n_features + ) + with pq.ParquetWriter(output_file, schema, compression="snappy") as writer: # Process in chunks to reduce memory usage for chunk_start in range(0, n_samples, chunk_size): chunk_end = min(chunk_start + chunk_size, n_samples) @@ -31,13 +36,13 @@ def test_generate_big_dataset(): chunk_labels = labels[chunk_start:chunk_end] # Generate chunk of features - chunk_features = {f'feature{i}': np.random.rand(chunk_end - chunk_start) for i in range(1, n_features + 1)} + chunk_features = { + f"feature{i}": np.random.rand(chunk_end - chunk_start) + for i in range(1, n_features + 1) + } # Create DataFrame chunk - chunk_data = { - 'sample_id': chunk_sample_ids, - 'label': chunk_labels - } + chunk_data = {"sample_id": chunk_sample_ids, "label": chunk_labels} chunk_data.update(chunk_features) df_chunk = pd.DataFrame(chunk_data) @@ -45,7 +50,6 @@ def test_generate_big_dataset(): # Convert to PyArrow Table and write chunk to Parquet file table_chunk = pa.Table.from_pandas(df_chunk, schema=schema) writer.write_table(table_chunk) - logging.info(f'Processed samples {chunk_start + 1} to {chunk_end}') + logging.info(f"Processed samples {chunk_start + 1} to {chunk_end}") print("Optimized Parquet file created successfully!") - diff --git a/fslite/tests/test_data_preprocessing.py b/fslite/tests/test_data_preprocessing.py index 9e35ad7..a247491 100644 --- a/fslite/tests/test_data_preprocessing.py +++ b/fslite/tests/test_data_preprocessing.py @@ -4,7 +4,11 @@ from fslite.config.context import init_spark, stop_spark_session from fslite.fs.core import FSDataFrame -from fslite.fs.utils import compute_missingness_rate, remove_features_by_missingness_rate, impute_missing +from fslite.fs.utils import ( + compute_missingness_rate, + remove_features_by_missingness_rate, + impute_missing, +) from fslite.utils.datasets import get_tnbc_data_missing_values_path from fslite.utils.io import import_table_as_psdf @@ -16,9 +20,11 @@ class TestDataPreprocessing(unittest.TestCase): """ def setUp(self) -> None: - init_spark(apply_pyarrow_settings=True, - apply_extra_spark_settings=True, - apply_pandas_settings=True) + init_spark( + apply_pyarrow_settings=True, + apply_extra_spark_settings=True, + apply_pandas_settings=True, + ) def tearDown(self) -> None: stop_spark_session() @@ -32,7 +38,7 @@ def import_FSDataFrame() -> FSDataFrame: :return: """ df = import_table_as_psdf(get_tnbc_data_missing_values_path(), n_partitions=5) - fsdf = FSDataFrame(df, sample_col='Sample', label_col='label') + fsdf = FSDataFrame(df, sample_col="Sample", label_col="label") return fsdf def test_compute_missingness_rate(self): @@ -43,8 +49,8 @@ def test_compute_missingness_rate(self): fsdf = self.import_FSDataFrame() features_missing_rates = compute_missingness_rate(fsdf) - self.assertEqual(features_missing_rates.get('tr|E9PBJ4'), 0.0) - self.assertAlmostEqual(features_missing_rates.get('sp|P07437'), 0.295, places=2) + self.assertEqual(features_missing_rates.get("tr|E9PBJ4"), 0.0) + self.assertAlmostEqual(features_missing_rates.get("sp|P07437"), 0.295, places=2) def test_filter_by_missingness_rate(self): """ @@ -66,7 +72,7 @@ def test_impute_missing(self): """ fsdf = self.import_FSDataFrame() - fsdf = impute_missing(fsdf, strategy='mean') + fsdf = impute_missing(fsdf, strategy="mean") # Collect features as array array = fsdf._collect_features_as_array() @@ -75,5 +81,5 @@ def test_impute_missing(self): self.assertFalse(np.isnan(array).any()) -if __name__ == '__main__': +if __name__ == "__main__": unittest.main() diff --git a/fslite/tests/test_fs_pipeline.py b/fslite/tests/test_fs_pipeline.py index ca5517e..6b8176e 100644 --- a/fslite/tests/test_fs_pipeline.py +++ b/fslite/tests/test_fs_pipeline.py @@ -10,9 +10,11 @@ class FeatureSelectionPipelineTest(unittest.TestCase): def setUp(self) -> None: - init_spark(apply_pyarrow_settings=True, - apply_extra_spark_settings=True, - apply_pandas_settings=True) + init_spark( + apply_pyarrow_settings=True, + apply_extra_spark_settings=True, + apply_pandas_settings=True, + ) def tearDown(self) -> None: stop_spark_session() @@ -20,7 +22,7 @@ def tearDown(self) -> None: @staticmethod def import_FSDataFrame(): df = import_table_as_psdf(get_tnbc_data_path(), n_partitions=5) - fsdf = FSDataFrame(df, sample_col='Sample', label_col='label') + fsdf = FSDataFrame(df, sample_col="Sample", label_col="label") return fsdf def test_feature_selection_pipeline(self): @@ -30,33 +32,32 @@ def test_feature_selection_pipeline(self): # create a Univariate object univariate = FSUnivariate( - fs_method='anova', - selection_mode='percentile', - selection_threshold=0.8) + fs_method="anova", selection_mode="percentile", selection_threshold=0.8 + ) # create a Multivariate object multivariate = FSMultivariate( - fs_method='m_corr', - corr_threshold=0.75, - corr_method="pearson" + fs_method="m_corr", corr_threshold=0.75, corr_method="pearson" ) # create a MLMethod object rf_classifier = FSMLMethod( - fs_method='rf_multilabel', + fs_method="rf_multilabel", rfe=True, rfe_iterations=2, percent_to_keep=0.9, - estimator_params={'labelCol': 'label'}, - evaluator_params={'metricName': 'accuracy'}, - grid_params={'numTrees': [10, 15], 'maxDepth': [5, 10]}, - cv_params={'parallelism': 2, 'numFolds': 5} + estimator_params={"labelCol": "label"}, + evaluator_params={"metricName": "accuracy"}, + grid_params={"numTrees": [10, 15], "maxDepth": [5, 10]}, + cv_params={"parallelism": 2, "numFolds": 5}, ) # create a pipeline object - fs_pipeline = FSPipeline(df_training=training_data, - df_testing=testing_data, - fs_stages=[univariate, multivariate, rf_classifier]) + fs_pipeline = FSPipeline( + df_training=training_data, + df_testing=testing_data, + fs_stages=[univariate, multivariate, rf_classifier], + ) # run the pipeline results = fs_pipeline.run() @@ -64,8 +65,8 @@ def test_feature_selection_pipeline(self): # print results print(results) - assert results.get('training_metric') > 0.9 + assert results.get("training_metric") > 0.9 -if __name__ == '__main__': +if __name__ == "__main__": unittest.main() diff --git a/fslite/tests/test_fsdataframe.py b/fslite/tests/test_fsdataframe.py index 6b67863..adda1bb 100644 --- a/fslite/tests/test_fsdataframe.py +++ b/fslite/tests/test_fsdataframe.py @@ -6,70 +6,69 @@ from fslite.fs.fdataframe import FSDataFrame + def test_initializes_fsdataframe(): # Create a sample DataFrame data = { - 'sample_id': [1, 2, 3], - 'label': ['A', 'B', 'C'], - 'feature1': [0.1, 0.2, 0.3], - 'feature2': [1.1, 1.2, 1.3] + "sample_id": [1, 2, 3], + "label": ["A", "B", "C"], + "feature1": [0.1, 0.2, 0.3], + "feature2": [1.1, 1.2, 1.3], } df = pd.DataFrame(data) # Initialize FSDataFrame - fs_df = FSDataFrame( - df=df, - sample_col='sample_id', - label_col='label' - ) + fs_df = FSDataFrame(df=df, sample_col="sample_id", label_col="label") # Assertions to check if the initialization is correct assert isinstance(fs_df, FSDataFrame) - assert fs_df.get_sample_col_name() == 'sample_id' + assert fs_df.get_sample_col_name() == "sample_id" + def test_scaler_df(): # Create a sample DataFrame data = { - 'sample_id': [1, 2, 3], - 'label': ['A', 'B', 'C'], - 'feature1': [0.1, 0.2, 0.3], - 'feature2': [1.1, 1.2, 1.3] + "sample_id": [1, 2, 3], + "label": ["A", "B", "C"], + "feature1": [0.1, 0.2, 0.3], + "feature2": [1.1, 1.2, 1.3], } df = pd.DataFrame(data) # Initialize FSDataFrame - fs_df = FSDataFrame( - df=df, - sample_col='sample_id', - label_col='label' - ) + fs_df = FSDataFrame(df=df, sample_col="sample_id", label_col="label") # Scale the DataFrame - fs_df.scale_features(scaler_method='standard') + fs_df.scale_features(scaler_method="standard") # Assertions to check if the scaling is correct assert fs_df.is_scaled() == True - assert fs_df.get_scaled_method() == 'standard' + assert fs_df.get_scaled_method() == "standard" + def test_memory_fsdataframe(): - def create_test_data(n_samples: int, n_features: int, zero_prob: float = 0.1, nan_prob: float = 0.05): + def create_test_data( + n_samples: int, n_features: int, zero_prob: float = 0.1, nan_prob: float = 0.05 + ): """Create test data for FSDataFrame.""" data = np.random.rand(n_samples, n_features) data[np.random.rand(n_samples, n_features) < zero_prob] = 0 data[np.random.rand(n_samples, n_features) < nan_prob] = np.nan - df = pd.DataFrame(data, columns=[f'feature_{i}' for i in range(n_features)]) - df['sample_id'] = [f'sample_{i}' for i in range(n_samples)] - df['label'] = np.random.choice(['A', 'B'], n_samples) + df = pd.DataFrame(data, columns=[f"feature_{i}" for i in range(n_features)]) + df["sample_id"] = [f"sample_{i}" for i in range(n_samples)] + df["label"] = np.random.choice(["A", "B"], n_samples) return df - def measure_memory_usage(n_samples: int, n_features: int, nan_prob = 0.01) -> float: + def measure_memory_usage(n_samples: int, n_features: int, nan_prob=0.01) -> float: """Measure memory usage of FSDataFrame for given number of samples and features.""" df = create_test_data(n_samples, n_features, nan_prob=nan_prob) - mem_usage = memory_usage((FSDataFrame, (df, 'sample_id', 'label')), max_iterations=1)[0] + mem_usage = memory_usage( + (FSDataFrame, (df, "sample_id", "label")), max_iterations=1 + )[0] gc.collect() # Force garbage collection to free memory return mem_usage @@ -84,25 +83,36 @@ def measure_memory_usage(n_samples: int, n_features: int, nan_prob = 0.01) -> fl for n_features in feature_sizes: for prob in nan_prob: mem_usage = measure_memory_usage(n_samples, n_features, nan_prob=prob) - results.append((n_samples, n_features, prob, mem_usage)) # Append prob to results + results.append( + (n_samples, n_features, prob, mem_usage) + ) # Append prob to results # Convert results to DataFrame - results_df = pd.DataFrame(results, columns=['Samples', 'Features', 'NAN Prob', 'Memory (MB)']) + results_df = pd.DataFrame( + results, columns=["Samples", "Features", "NAN Prob", "Memory (MB)"] + ) # Create 2D line plot plt.figure(figsize=(12, 8)) for feature_size in feature_sizes: for prob in nan_prob: - data = results_df[(results_df['Features'] == feature_size) & (results_df['NAN Prob'] == prob)] - plt.plot(data['Samples'], data['Memory (MB)'], marker='o', - label=f'{feature_size} Features - {prob} NAN Prob') - - plt.xlabel('Number of Samples') - plt.ylabel('Memory Usage (MB)') - plt.title('FSDataFrame Memory Usage') + data = results_df[ + (results_df["Features"] == feature_size) + & (results_df["NAN Prob"] == prob) + ] + plt.plot( + data["Samples"], + data["Memory (MB)"], + marker="o", + label=f"{feature_size} Features - {prob} NAN Prob", + ) + + plt.xlabel("Number of Samples") + plt.ylabel("Memory Usage (MB)") + plt.title("FSDataFrame Memory Usage") plt.legend() - plt.xscale('log') # Using log scale for x-axis to better visualize the range + plt.xscale("log") # Using log scale for x-axis to better visualize the range plt.tight_layout() plt.show() @@ -110,4 +120,3 @@ def measure_memory_usage(n_samples: int, n_features: int, nan_prob = 0.01) -> fl print(results_df.to_string(index=False)) # Initialize FSDataFrame with DataFrame having sparse numerical features and insufficient memory for dense matrix - diff --git a/fslite/tests/test_import_export.py b/fslite/tests/test_import_export.py index 05d801f..507f379 100644 --- a/fslite/tests/test_import_export.py +++ b/fslite/tests/test_import_export.py @@ -11,9 +11,11 @@ class TestImportExport(unittest.TestCase): def setUp(self) -> None: - init_spark(apply_pyarrow_settings=True, - apply_extra_spark_settings=True, - apply_pandas_settings=True) + init_spark( + apply_pyarrow_settings=True, + apply_extra_spark_settings=True, + apply_pandas_settings=True, + ) def tearDown(self) -> None: stop_spark_session() @@ -23,8 +25,7 @@ def test_import_tsv(self): Test import tsv file as Spark DataFrame. :return: None """ - df = import_table(path=get_tnbc_data_path(), - n_partitions=10) + df = import_table(path=get_tnbc_data_path(), n_partitions=10) self.assertIsInstance(df, pyspark.sql.DataFrame) self.assertEqual(df.count(), 44) @@ -34,12 +35,11 @@ def test_import_tsv_as_psdf(self): Test import tsv file as Pandas on Spark DataFrame (PoS). :return: None """ - df = import_table_as_psdf(path=get_tnbc_data_path(), - n_partitions=10) + df = import_table_as_psdf(path=get_tnbc_data_path(), n_partitions=10) self.assertIsInstance(df, ps.frame.DataFrame) - self.assertEqual(df.shape, (44, 502)) + self.assertEqual(df.shape, (44, 502)) -if __name__ == '__main__': +if __name__ == "__main__": unittest.main() diff --git a/fslite/tests/test_ml_methods.py b/fslite/tests/test_ml_methods.py index 1afd46f..5b624d5 100644 --- a/fslite/tests/test_ml_methods.py +++ b/fslite/tests/test_ml_methods.py @@ -1,9 +1,10 @@ import unittest -from pyspark.ml.classification import (RandomForestClassifier, - LogisticRegression) -from pyspark.ml.evaluation import (BinaryClassificationEvaluator, - MulticlassClassificationEvaluator) +from pyspark.ml.classification import RandomForestClassifier, LogisticRegression +from pyspark.ml.evaluation import ( + BinaryClassificationEvaluator, + MulticlassClassificationEvaluator, +) from fslite.config.context import init_spark, stop_spark_session from fslite.fs.core import FSDataFrame @@ -15,9 +16,11 @@ class MLMethodTest(unittest.TestCase): def setUp(self) -> None: - init_spark(apply_pyarrow_settings=True, - apply_extra_spark_settings=True, - apply_pandas_settings=True) + init_spark( + apply_pyarrow_settings=True, + apply_extra_spark_settings=True, + apply_pandas_settings=True, + ) def tearDown(self) -> None: stop_spark_session() @@ -25,20 +28,20 @@ def tearDown(self) -> None: @staticmethod def import_FSDataFrame(): df = import_table_as_psdf(get_tnbc_data_path(), n_partitions=5) - fsdf = FSDataFrame(df, sample_col='Sample', label_col='label') + fsdf = FSDataFrame(df, sample_col="Sample", label_col="label") return fsdf def test_build_model_using_cross_validator(self): fsdf = self.import_FSDataFrame() estimator = RandomForestClassifier() evaluator = BinaryClassificationEvaluator() - grid_params = {'numTrees': [10, 20, 30], 'maxDepth': [5, 10, 15]} + grid_params = {"numTrees": [10, 20, 30], "maxDepth": [5, 10, 15]} ml_method = MLCVModel( estimator=estimator, evaluator=evaluator, estimator_params=None, grid_params=None, - cv_params=None + cv_params=None, ) print(ml_method._cross_validator.__str__()) @@ -51,21 +54,19 @@ def test_get_feature_scores_random_forest_classifier(self): # Create a RandomForestClassifier model estimator = RandomForestClassifier() evaluator = MulticlassClassificationEvaluator() - estimator_params = {'labelCol': 'label'} - grid_params = {'numTrees': [10, 20, 30], 'maxDepth': [5, 10, 15]} - cv_params = {'parallelism': 2, 'numFolds': 5, 'collectSubModels': False} + estimator_params = {"labelCol": "label"} + grid_params = {"numTrees": [10, 20, 30], "maxDepth": [5, 10, 15]} + cv_params = {"parallelism": 2, "numFolds": 5, "collectSubModels": False} ml_method = MLCVModel( estimator=estimator, evaluator=evaluator, estimator_params=estimator_params, grid_params=grid_params, - cv_params=cv_params + cv_params=cv_params, ) - (ml_method - .fit(fsdf) - ) + (ml_method.fit(fsdf)) # Get the feature scores feature_scores = ml_method.get_feature_scores() @@ -74,11 +75,11 @@ def test_get_feature_scores_random_forest_classifier(self): assert not feature_scores.empty # Assert that the feature scores DataFrame has the expected columns - expected_columns = ['features', 'feature_index', 'scores', 'percentile_rank'] + expected_columns = ["features", "feature_index", "scores", "percentile_rank"] assert list(feature_scores.columns) == expected_columns # check if dataframe is sorted by scores (descending) - assert feature_scores['scores'].is_monotonic_decreasing + assert feature_scores["scores"].is_monotonic_decreasing print(feature_scores) @@ -87,22 +88,20 @@ def test_multilabel_rf_model(self): training_data, testing_data = fsdf.split_df(split_training_factor=0.8) estimator = RandomForestClassifier() - evaluator = MulticlassClassificationEvaluator(metricName='accuracy') - estimator_params = {'labelCol': 'label'} - grid_params = {'numTrees': [5, 10], 'maxDepth': [3, 5]} - cv_params = {'parallelism': 2, 'numFolds': 3} + evaluator = MulticlassClassificationEvaluator(metricName="accuracy") + estimator_params = {"labelCol": "label"} + grid_params = {"numTrees": [5, 10], "maxDepth": [3, 5]} + cv_params = {"parallelism": 2, "numFolds": 3} ml_method = MLCVModel( estimator=estimator, evaluator=evaluator, estimator_params=estimator_params, grid_params=grid_params, - cv_params=cv_params + cv_params=cv_params, ) - (ml_method - .fit(training_data) - ) + (ml_method.fit(training_data)) # get the accuracy on training eval_training = ml_method.get_eval_metric_on_training() @@ -118,22 +117,20 @@ def test_multilabel_lr_model(self): training_data, testing_data = fsdf.split_df(split_training_factor=0.6) estimator = LogisticRegression() - evaluator = MulticlassClassificationEvaluator(metricName='accuracy') - estimator_params = {'labelCol': 'label'} - grid_params = {'regParam': [0.1, 0.01]} - cv_params = {'parallelism': 2, 'numFolds': 3} + evaluator = MulticlassClassificationEvaluator(metricName="accuracy") + estimator_params = {"labelCol": "label"} + grid_params = {"regParam": [0.1, 0.01]} + cv_params = {"parallelism": 2, "numFolds": 3} ml_method = MLCVModel( estimator=estimator, evaluator=evaluator, estimator_params=estimator_params, grid_params=grid_params, - cv_params=cv_params + cv_params=cv_params, ) - (ml_method - .fit(training_data) - ) + (ml_method.fit(training_data)) # get the accuracy on training eval_training = ml_method.get_eval_metric_on_training() @@ -150,19 +147,19 @@ def test_FSMLMethod(self): fsdf = self.import_FSDataFrame() training_data, testing_data = fsdf.split_df(split_training_factor=0.7) - estimator_params = {'labelCol': 'label'} - grid_params = {'numTrees': [5, 10], 'maxDepth': [3, 5]} - cv_params = {'parallelism': 2, 'numFolds': 3} + estimator_params = {"labelCol": "label"} + grid_params = {"numTrees": [5, 10], "maxDepth": [3, 5]} + cv_params = {"parallelism": 2, "numFolds": 3} ml_method = FSMLMethod( - fs_method='rf_multilabel', + fs_method="rf_multilabel", rfe=True, rfe_iterations=2, percent_to_keep=0.9, estimator_params=estimator_params, - evaluator_params={'metricName': 'accuracy'}, + evaluator_params={"metricName": "accuracy"}, grid_params=grid_params, - cv_params=cv_params + cv_params=cv_params, ) filtered_fsdf = ml_method.select_features(training_data) @@ -176,5 +173,5 @@ def test_FSMLMethod(self): assert testing_acc > 0.7 -if __name__ == '__main__': +if __name__ == "__main__": unittest.main() diff --git a/fslite/tests/test_univariate_methods.py b/fslite/tests/test_univariate_methods.py index 16ae0a5..e97d636 100644 --- a/fslite/tests/test_univariate_methods.py +++ b/fslite/tests/test_univariate_methods.py @@ -4,6 +4,7 @@ from fslite.fs.univariate import univariate_filter + def test_univariate_filter_corr(): """ Test univariate_filter method with 'u_corr' method. @@ -11,17 +12,18 @@ def test_univariate_filter_corr(): """ # import tsv as pandas DataFrame - df = pd.read_csv(get_tnbc_data_path(), sep='\t') + df = pd.read_csv(get_tnbc_data_path(), sep="\t") # create FSDataFrame instance - fs_df = FSDataFrame(df=df,sample_col='Sample',label_col='label') + fs_df = FSDataFrame(df=df, sample_col="Sample", label_col="label") - fsdf_filtered = univariate_filter(fs_df, univariate_method='u_corr', corr_threshold=0.3) + fsdf_filtered = univariate_filter( + fs_df, univariate_method="u_corr", corr_threshold=0.3 + ) assert fs_df.count_features() == 500 assert fsdf_filtered.count_features() == 211 # Export the filtered DataFrame as Pandas DataFrame df_filtered = fsdf_filtered.to_pandas() - df_filtered.to_csv('filtered_tnbc_data.csv', index=False) - + df_filtered.to_csv("filtered_tnbc_data.csv", index=False) diff --git a/fslite/utils/generic.py b/fslite/utils/generic.py index 5674998..9ab70e8 100644 --- a/fslite/utils/generic.py +++ b/fslite/utils/generic.py @@ -11,10 +11,12 @@ def tag(label: str): :param label: tag label :return: decorator """ + def decorator(func): def wrapper(*args, **kwargs): print(f"Tag for {func.__name__}: {label}") return func(*args, **kwargs) return wrapper + return decorator diff --git a/fslite/utils/io.py b/fslite/utils/io.py index 85adb13..74c202c 100644 --- a/fslite/utils/io.py +++ b/fslite/utils/io.py @@ -8,10 +8,9 @@ warnings.filterwarnings("ignore") -def import_table(path: str, - header: bool = True, - sep: str = "\t", - n_partitions: int = 5) -> pyspark.sql.DataFrame: +def import_table( + path: str, header: bool = True, sep: str = "\t", n_partitions: int = 5 +) -> pyspark.sql.DataFrame: """ Import tsv file as Spark DataFrame. @@ -28,19 +27,17 @@ def import_table(path: str, if _sc is None: raise ValueError("Active Spark Session not found...") - sdf = (_sc - .read - .option("delimiter", sep) - .option("header", header) - .option("inferSchema", "true") - .csv(path) - .repartition(n_partitions) - ) + sdf = ( + _sc.read.option("delimiter", sep) + .option("header", header) + .option("inferSchema", "true") + .csv(path) + .repartition(n_partitions) + ) return sdf -def import_parquet(path: str, - header: bool = True) -> pyspark.sql.DataFrame: +def import_parquet(path: str, header: bool = True) -> pyspark.sql.DataFrame: """ Import parquet file as Spark DataFrame. @@ -55,18 +52,13 @@ def import_parquet(path: str, if _sc is None: raise ValueError("Active Spark Session not found...") - sdf = (_sc - .read - .option("header", header) - .option("inferSchema", "true") - .parquet(path) - ) + sdf = _sc.read.option("header", header).option("inferSchema", "true").parquet(path) return sdf -def import_table_as_psdf(path: str, - sep: str = "\t", - n_partitions: int = 5) -> pyspark.pandas.DataFrame: +def import_table_as_psdf( + path: str, sep: str = "\t", n_partitions: int = 5 +) -> pyspark.pandas.DataFrame: """ Import tsv file as Pandas on Spark DataFrame @@ -80,13 +72,10 @@ def import_table_as_psdf(path: str, import pyspark.pandas as ps # apply settings for pandas on spark api - [ps.set_option(k, PANDAS_ON_SPARK_API_SETTINGS.get(k)) - for k in PANDAS_ON_SPARK_API_SETTINGS.keys()] - - psdf = (ps - .read_csv(path, - sep=sep) - .spark - .repartition(n_partitions) - ) + [ + ps.set_option(k, PANDAS_ON_SPARK_API_SETTINGS.get(k)) + for k in PANDAS_ON_SPARK_API_SETTINGS.keys() + ] + + psdf = ps.read_csv(path, sep=sep).spark.repartition(n_partitions) return psdf diff --git a/setup.py b/setup.py index 73c353d..b4bb0c4 100644 --- a/setup.py +++ b/setup.py @@ -4,13 +4,13 @@ long_description = fh.read() setup( - name='fslite', - version='0.0.1', - url='https://github.com/bigbio/fsspark', - license='Apache-2.0', - author='Enrique Audain Martinez', - author_email='enrique.audain@gmail.com', - description='Feature selection in Spark', + name="fslite", + version="0.0.1", + url="https://github.com/bigbio/fsspark", + license="Apache-2.0", + author="Enrique Audain Martinez", + author_email="enrique.audain@gmail.com", + description="Feature selection in Spark", long_description=long_description, long_description_content_type="text/markdown", packages=find_packages(), @@ -20,7 +20,7 @@ "networkx", "setuptools", "pandas", - "pyarrow" + "pyarrow", ], classifiers=[ # Classifiers for your package @@ -28,5 +28,5 @@ "License :: OSI Approved :: MIT License", "Operating System :: POSIX :: Linux", ], - python_requires='>=3.9.0', + python_requires=">=3.9.0", )