diff --git a/README.md b/README.md index f8d3e5c..617ce34 100644 --- a/README.md +++ b/README.md @@ -1,41 +1,43 @@ -[![Python application](https://github.com/enriquea/fsspark/actions/workflows/python-app.yml/badge.svg?branch=main)](https://github.com/enriquea/fsspark/actions/workflows/python-app.yml) -[![Python Package using Conda](https://github.com/enriquea/fsspark/actions/workflows/python-package-conda.yml/badge.svg?branch=main)](https://github.com/enriquea/fsspark/actions/workflows/python-package-conda.yml) +[![Python application](https://github.com/bigbio/fslite/actions/workflows/python-app.yml/badge.svg?branch=main)](https://github.com/enriquea/fslite/actions/workflows/python-app.yml) +[![Python Package using Conda](https://github.com/bigbio/fslite/actions/workflows/python-package-conda.yml/badge.svg?branch=main)](https://github.com/bigbio/fslite/actions/workflows/python-package-conda.yml) -# fsspark +# fslite --- -## Feature selection in Spark +### Memory-Efficient, High-Performance Feature Selection Library for Big and Small Datasets ### Description -`fsspark` is a python module to perform feature selection and machine learning based on spark. -Pipelines written using `fsspark` can be divided roughly in four major stages: 1) data pre-processing, 2) univariate +`fslite` is a python module to perform feature selection and machine learning using pre-built FS pipelines. +Pipelines written using `fslite` can be divided roughly in four major stages: 1) data pre-processing, 2) univariate filters, 3) multivariate filters and 4) machine learning wrapped with cross-validation (**Figure 1**). +`fslite` is based on our previous work [feseR](https://github.com/enriquea/feseR); previously implemented in R and caret package; publication can be found [here](https://journals.plos.org/plosone/article?id=10.1371/journal.pone.0189875). + ![Feature Selection flowchart](images/fs_workflow.png) -**Figure 1**. Feature selection workflow example implemented in fsspark. +**Figure 1**. Feature selection workflow example implemented in fslite. ### Documentation The package documentation describes the [data structures](docs/README.data.md) and -[features selection methods](docs/README.methods.md) implemented in `fsspark`. +[features selection methods](docs/README.methods.md) implemented in `fslite`. ### Installation - pip ```bash -git clone https://github.com/enriquea/fsspark.git -cd fsspark +git clone https://github.com/bigbio/fslite.git +cd fslite pip install . -r requirements.txt ``` - conda ```bash -git clone https://github.com/enriquea/fsspark.git -cd fsspark +git clone https://github.com/bigbio/fslite.git +cd fslite conda env create -f environment.yml -conda activate fsspark-venv +conda activate fslite-venv pip install . -r requirements.txt ``` diff --git a/docs/EXPERIMENTS.md b/docs/EXPERIMENTS.md new file mode 100644 index 0000000..777d1e0 --- /dev/null +++ b/docs/EXPERIMENTS.md @@ -0,0 +1,4 @@ +## Experiments and Benchmarks + +This document contains the experiments and benchmarks that were conducted to evaluate the performance of fslite. +The experiments were conducted on the following datasets: \ No newline at end of file diff --git a/docs/README.data.md b/docs/README.data.md index e812609..5305b74 100644 --- a/docs/README.data.md +++ b/docs/README.data.md @@ -1,9 +1,9 @@ -## fsspark - data structures +## fslite - data structures --- -`fsspark` is a Python package that provides a set of tools for feature selection in Spark. -Here we describe the main data structures used in `fsspark` and how to use them. +`fslite` is a Python package that provides a set of tools for feature selection in Spark. +Here we describe the main data structures used in `fslite` and how to use them. ### Input data @@ -32,30 +32,32 @@ The following is an example of a TSV file with a binary response variable: ### Import functions -`fsspark` provides two main functions to import data from a TSV file. +`fslite` provides two main functions to import data from a TSV file. - `import_table` - Import data from a TSV file into a Spark Data Frame (sdf). ```python -from fsspark.utils.io import import_table -sdf = import_table('data.tsv.bgz', - sep='\t', - n_partitions=5) +from fslite.utils.io import import_table + +sdf = import_table('data.tsv.bgz', + sep='\t', + n_partitions=5) ``` - `import_table_as_psdf` - Import data from a TSV file into a Spark Data Frame (sdf) and convert it into a Pandas on Spark Data Frame (psdf). ```python -from fsspark.utils.io import import_table_as_psdf -psdf = import_table_as_psdf('data.tsv.bgz', - sep='\t', +from fslite.utils.io import import_table_as_psdf + +psdf = import_table_as_psdf('data.tsv.bgz', + sep='\t', n_partitions=5) ``` ### The Feature Selection Spark Data Frame (FSDataFrame) -The `FSDataFrame` (**Figure 1**) is a core functionality of `fsspark`. It is a wrapper around a Spark Data Frame (sdf) +The `FSDataFrame` (**Figure 1**) is a core functionality of `fslite`. It is a wrapper around a Spark Data Frame (sdf) that provides a set of methods to facilitate feature selection tasks. The `FSDataFrame` is initialized with a Spark Data Frame (sdf) or a Pandas on Spark Data Frame (psdf) and two mandatory arguments: `sample_col` and `label_col`. The `sample_col` argument is the name of the column in the sdf that @@ -73,9 +75,9 @@ contains the response variable. #### How to create a Feature Selection Spark Data Frame (FSDF) ```python -from fsspark.config.context import init_spark, stop_spark_session -from fsspark.fs.core import FSDataFrame -from fsspark.utils.io import import_table_as_psdf +from fslite.config.context import init_spark, stop_spark_session +from fslite.fs.core import FSDataFrame +from fslite.utils.io import import_table_as_psdf # Init spark init_spark() diff --git a/docs/README.methods.md b/docs/README.methods.md index 5fb42cd..9359f4c 100644 --- a/docs/README.methods.md +++ b/docs/README.methods.md @@ -1,10 +1,10 @@ -# fsspark - features selection methods +# fslite - features selection methods --- -`fsspark `includes a set of methods to perform feature selection and machine learning based on spark. -A typical workflow written using `fsspark` can be divided roughly in four major stages: +`fslite `includes a set of methods to perform feature selection and machine learning based on spark. +A typical workflow written using `fslite` can be divided roughly in four major stages: 1) data pre-processing. 2) univariate filters. @@ -53,4 +53,4 @@ A typical workflow written using `fsspark` can be divided roughly in four major ### 5. Feature selection pipeline example -[FS pipeline example](../fsspark/pipeline/fs_pipeline_example.py) +[FS pipeline example](../fslite/pipeline/fs_pipeline_example.py) diff --git a/environment.yml b/environment.yml index 9f48f63..373ccaf 100644 --- a/environment.yml +++ b/environment.yml @@ -1,4 +1,4 @@ -name: fsspark-venv +name: fslite-venv channels: - defaults - conda-forge @@ -6,9 +6,14 @@ dependencies: - python==3.10 - pip - pip: - - setuptools~=65.5.0 - - pyspark~=3.3.0 - - networkx~=2.8.7 - - numpy~=1.23.4 - - pandas~=1.5.1 - - pyarrow~=8.0.0 + - setuptools + - networkx + - numpy + - pyarrow + - pandas + - scipy + - scikit-learn + - psutil + - pytest + - matplotlib + - memory-profiler diff --git a/examples/loom2parquetchunks.py b/examples/loom2parquetchunks.py new file mode 100644 index 0000000..711ef7a --- /dev/null +++ b/examples/loom2parquetchunks.py @@ -0,0 +1,136 @@ +# Import and convert to parquet a single-cell dataset: GSE156793 (loom format) +# GEO URL: +# https://www.ncbi.nlm.nih.gov/geo/download/?acc=GSE156793&format=file&file=GSE156793%5FS3%5Fgene%5Fcount%2Eloom%2Egz + +# import libraries +import pandas as pd +import loompy +import pyarrow.parquet as pq +import pyarrow as pa + +# define the path to the loom file +loom_file = "GSE156793_S3_gene_count.loom" + +# connect to the loom file +ds = loompy.connect(loom_file) + +# get shape of the data +ds.shape + +# retrieve the row attributes +ds.ra.keys() + +# get gene ids +gene_ids = ds.ra["gene_id"] +gene_ids[0:10] + +# get the column attributes +ds.ca.keys() + +# get sample metadata +sample_id = ds.ca["sample"] +cell_cluster = ds.ca["Main_cluster_name"] +assay = ds.ca["Assay"] +development_day = ds.ca["Development_day"] + +# make a dataframe with the sample metadata, define the columns types +sample_df = pd.DataFrame({ + "sample_id": sample_id, + "cell_cluster": cell_cluster, + "assay": assay, + "development_day": development_day, + } +) + +# print the first 5 rows +sample_df.head() + +# Make 'cell_cluster' a categorical variable encoded as an integer +sample_df["cell_cluster"] = sample_df["cell_cluster"].astype("category") +sample_df["cell_cluster_id"] = sample_df["cell_cluster"].cat.codes + +# Make 'assay' a categorical variable encoded as an integer +sample_df["assay"] = sample_df["assay"].astype("category") +sample_df["assay_id"] = sample_df["assay"].cat.codes + +# Make 'sample_id' the index +sample_df = sample_df.set_index("sample_id") + +# Show the first 5 rows +sample_df.head() + +# Save the sample metadata to parquet +( + sample_df.reset_index().to_parquet( + "sample_metadata.parquet", index=False, engine="auto", compression="gzip" + ) +) + + +# transpose dataset and convert to parquet. +# process the data per chunks. +chunk_size = 10000 +writer = None +count = 0 +number_chunks = 10 # number of chunks to process + +for ix, selection, view in ds.scan(axis=1, batch_size=chunk_size): + # retrieve the chunk + matrix_chunk = view[:, :] + + # transpose the data + matrix_chunk_t = matrix_chunk.T + + # convert to pandas dataframe + df_chunk = pd.DataFrame( + matrix_chunk_t, index=sample_id[selection.tolist()], columns=gene_ids + ) + + # merge chunk with sample metadata + df_chunk = pd.merge( + left=sample_df[["cell_cluster_id", "development_day", "assay_id"]], + right=df_chunk, + how="inner", + left_index=True, + right_index=True, + sort=False, + copy=True, + indicator=False, + validate="one_to_one", + ) + + # reset the index + df_chunk = df_chunk.reset_index() + + # rename the index column + df_chunk = df_chunk.rename(columns={"index": "sample_id"}) + + if writer is None: + # define the schema + schema = pa.schema( + [ + pa.field("sample_id", pa.string()), + pa.field("cell_cluster_id", pa.int8()), + pa.field("development_day", pa.int64()), + pa.field("assay_id", pa.int8()), + ] + + [pa.field(gene_id, pa.float32()) for gene_id in gene_ids] + ) + + print(len(list(df_chunk.columns))) + print(len(schema)) + + # create the parquet writer + writer = pq.ParquetWriter("GSE156793.parquet", schema, compression="snappy") + + writer.write_table(pa.Table.from_pandas(df_chunk, preserve_index=False)) + + print(f"Chunk {ix} saved") + + count += 1 + if count >= number_chunks: + break + +if writer is not None: + writer.close() + print(f"Concatenated parquet file written to GSE156793.parquet") diff --git a/fsspark/__init__.py b/fslite/__init__.py similarity index 100% rename from fsspark/__init__.py rename to fslite/__init__.py diff --git a/fsspark/config/__init__.py b/fslite/fs/__init__.py similarity index 100% rename from fsspark/config/__init__.py rename to fslite/fs/__init__.py diff --git a/fslite/fs/constants.py b/fslite/fs/constants.py new file mode 100644 index 0000000..49817bf --- /dev/null +++ b/fslite/fs/constants.py @@ -0,0 +1,156 @@ +""" +This file contains a list of constants used in the feature selection and machine learning methods. +""" + +from typing import Dict, List, Union + +FS_METHODS = { + "univariate": { + "title": "Univariate Feature Selection", + "description": "Univariate feature selection refers to the process of selecting the most relevant features for " + "a machine learning model by evaluating each feature individually with respect to the target " + "variable using univariate statistical tests. It simplifies the feature selection process by " + "treating each feature independently and assessing its contribution to the predictive " + "performance of the model.", + "methods": [ + { + "name": "anova", + "description": "Univariate ANOVA feature selection (f-classification)", + }, + {"name": "u_corr", "description": "Univariate Pearson's correlation"}, + {"name": "f_regression", "description": "Univariate f-regression"}, + { + "name": "mutual_info_regression", + "description": "Univariate mutual information regression", + }, + { + "name": "mutual_info_classification", + "description": "Univariate mutual information classification", + }, + ], + }, + "multivariate": { + "title": "Multivariate Feature Selection", + "description": "Multivariate feature selection is a method of selecting features by evaluating them in " + "combination rather than individually. Unlike univariate feature selection, which treats each " + "feature separately, multivariate feature selection considers the relationships and interactions " + "between multiple features and the target variable. This method aims to identify a subset of " + "features that work well together to improve the performance of a machine learning model.", + "methods": [ + {"name": "m_corr", "description": "Multivariate Correlation"}, + {"name": "variance", "description": "Multivariate Variance"}, + ], + }, + "ml": { + "title": "Machine Learning Wrapper", + "description": "Machine learning wrapper methods are feature selection techniques that use a machine learning ", + "methods": [ + {"name": "rf_binary", "description": "Random Forest Binary Classifier"}, + {"name": "lsvc_binary", "description": "Linear SVC Binary Classifier"}, + { + "name": "fm_binary", + "description": "Factorization Machine Binary Classifier", + }, + { + "name": "rf_multilabel", + "description": "Random Forest Multi-label Classifier", + }, + { + "name": "lg_multilabel", + "description": "Logistic Regression Multi-label Classifier", + }, + {"name": "rf_regression", "description": "Random Forest Regression"}, + { + "name": "fm_regression", + "description": "Factorization Machine Regression", + }, + ], + }, +} + + +def get_fs_methods(): + """ + Get the list of feature selection methods + :return: dict + """ + return FS_METHODS + + +def get_fs_method_details(method_name: str) -> Union[Dict, None]: + """ + Get the details of the feature selection method, this function search in all-methods definitions + and get the details of the method with the given name. If the method is not found, it returns None. + The method name is case-insensitive. + :param method_name: str + :return: dict + """ + + for method_type in FS_METHODS: + for method in FS_METHODS[method_type]["methods"]: + if method["name"].lower() == method_name.lower(): + return method + return None + + +def get_fs_univariate_methods() -> List: + """ + Get the list of univariate methods implemented in the library + :return: list + """ + return get_fs_method_by_class("univariate") + + +def get_fs_multivariate_methods() -> List: + return get_fs_method_by_class("multivariate") + + +def get_fs_ml_methods() -> List: + return get_fs_method_by_class("ml") + + +def is_valid_univariate_method(method_name: str) -> bool: + """ + This method check if the given method name is a supported univariate method + :param method_name method name + :return: boolean + """ + for method in FS_METHODS["univariate"]["methods"]: + if method["name"].lower() == method_name: + return True + return False + + +def is_valid_multivariate_method(method_name: str) -> bool: + """ + This method check if the given method name is a supported multivariate method + :param method_name method name + :return: boolean + """ + for method in FS_METHODS["multivariate"]["methods"]: + if method["name"].lower() == method_name: + return True + return False + + +def is_valid_ml_method(method_name: str) -> bool: + """ + This method check if the given method name is a supported machine learning method + :param method_name method name + :return: boolean + """ + for method in FS_METHODS["ml"]["methods"]: + if method["name"].lower() == method_name: + return True + return False + + +def get_fs_method_by_class(fs_class: str) -> List: + """ + Get the FS method supported for a given FS class, for example, univariate + :param fs_class + :return FS List + """ + fs_methods = FS_METHODS[fs_class] + fs_names = [method["name"] for method in fs_methods["methods"]] + return fs_names diff --git a/fslite/fs/fdataframe.py b/fslite/fs/fdataframe.py new file mode 100644 index 0000000..318bffd --- /dev/null +++ b/fslite/fs/fdataframe.py @@ -0,0 +1,295 @@ +import logging +from typing import List, Tuple, Optional, Union + +import numpy +import numpy as np +import pandas as pd +import psutil +from pandas import DataFrame +from scipy import sparse +from sklearn.preprocessing import ( + MinMaxScaler, + MaxAbsScaler, + StandardScaler, + RobustScaler, + LabelEncoder, +) + +logging.basicConfig(format="%(levelname)s (%(name)s %(lineno)s): %(message)s") +logger = logging.getLogger("pickfeat") +logger.setLevel(logging.INFO) + + +class FSDataFrame: + """ + FSDataFrame is a representation of a DataFrame with some functionalities to perform feature selection. + An object from FSDataFrame is basically represented by a DataFrame with samples + as rows and features as columns, with extra distributed indexed pandas series for + feature names and samples labels. + + An object of FSDataFrame offers an interface to a DataFrame, a Pandas on DataFrame + (e.g., suitable for visualization) or a DataFrame with features as a Dense column vector (e.g. suitable for + applying most algorithms from MLib API). + + It can also be split in training and testing dataset and filtered by removing selected features (by name or index). + + [...] + + """ + + def __init__( + self, + df: pd.DataFrame, + sample_col: Optional[str] = None, + label_col: Optional[str] = None, + sparse_threshold: float = 0.7, # Threshold for sparsity + memory_threshold: Optional[float] = 0.75, # Proportion of system memory to use for dense arrays + ): + """ + Create an instance of FSDataFrame. + + The input DataFrame should contain 2+N columns. After specifying the sample id and label columns, + the remaining N columns will be considered features. The feature columns should contain only numerical data. + The DataFrame is stored in a dense or sparse format based on the sparsity of the data and available memory. + + :param df: Input Pandas DataFrame + :param sample_col: Column name for sample identifiers (optional) + :param label_col: Column name for labels (required) + :param sparse_threshold: Threshold for sparsity, default is 70%. If the proportion of zero entries + in the feature matrix exceeds this value, the matrix is stored in a sparse format unless memory allows. + :param memory_threshold: Proportion of system memory available to use before deciding on sparse/dense. + """ + # Copy the DataFrame for internal usage + self.__df = df + + # Handle sample column + if sample_col: + if sample_col not in df.columns: + raise ValueError(f"Sample column '{sample_col}' not found in DataFrame.") + self.__sample_col = sample_col + self.__samples = df[sample_col].tolist() + else: + self.__sample_col = None + self.__samples = [] + logging.info("No sample column specified.") + + # Handle label column + if label_col is None: + raise ValueError("A label column is required but was not specified.") + if label_col not in df.columns: + raise ValueError(f"Label column '{label_col}' not found in DataFrame.") + + self.__label_col = label_col + self.__labels = df[label_col].tolist() + + # Encode labels (assume categorical for now) + label_encoder = LabelEncoder() + self.__labels_matrix = label_encoder.fit_transform(df[label_col]).tolist() + + # Select only numerical columns, excluding sample_col and label_col + feature_columns = df.select_dtypes(include=[np.number]).columns.tolist() + self.__original_features = [col for col in feature_columns if col not in [sample_col, label_col]] + + # Select only the feature columns directly (no drop) + numerical_df = df[self.__original_features] + + if numerical_df.empty: + raise ValueError("No numerical features found in the DataFrame.") + + # Calculate sparsity + num_elements = numerical_df.size + num_zeros = (numerical_df == 0).sum().sum() + sparsity = num_zeros / num_elements + + # Estimate memory usage + dense_matrix_size = numerical_df.memory_usage(deep=True).sum() # In bytes + available_memory = psutil.virtual_memory().available # In bytes + + # Handle sparse or dense matrix based on sparsity and available memory + if sparsity > sparse_threshold: + if dense_matrix_size < memory_threshold * available_memory: + logging.info( + f"Data is sparse (sparsity={sparsity:.2f}) but enough memory available. " + f"Using a dense matrix." + ) + self.__matrix = numerical_df.to_numpy(dtype=np.float32) + self.__is_sparse = False + else: + logging.info( + f"Data is sparse (sparsity={sparsity:.2f}), memory insufficient for dense matrix. " + f"Using a sparse matrix representation." + ) + self.__matrix = sparse.csr_matrix(numerical_df.to_numpy(dtype=np.float32)) + self.__is_sparse = True + else: + logging.info(f"Data is not sparse (sparsity={sparsity:.2f}), using a dense matrix.") + self.__matrix = numerical_df.to_numpy(dtype=np.float32) + self.__is_sparse = False + + self.__is_scaled = (False, None) + + def get_feature_matrix(self) -> Union[np.ndarray, sparse.csr_matrix]: + return self.__matrix + + def get_label_vector(self) -> numpy.array: + return self.__labels_matrix + + def get_sample_col_name(self) -> str: + """ + Return sample id column name. + + :return: Sample id column name. + """ + return self.__sample_col + + def get_label_col_name(self) -> str: + """ + Return sample label column name. + + :return: Sample label column name. + """ + return self.__label_col + + def count_features(self) -> int: + """ + Return the number of features. + :return: Number of features. + """ + return self.__matrix.shape[1] + + def count_instances(self) -> int: + """ + Return the number of samples (instances). + :return: Number of samples. + """ + return self.__matrix.shape[0] + + def scale_features(self, scaler_method: str = "standard", **kwargs) -> bool: + """ + Scales features in the SDataFrame using a specified method. + + :param scaler_method: One of: min_max, max_abs, standard or robust. + :return: FSDataFrame with scaled features. + """ + + if scaler_method == "min_max": + scaler = MinMaxScaler(**kwargs) + elif scaler_method == "max_abs": + scaler = MaxAbsScaler(**kwargs) + elif scaler_method == "standard": + scaler = StandardScaler(**kwargs) + elif scaler_method == "robust": + scaler = RobustScaler(**kwargs) + else: + raise ValueError( + "`scaler_method` must be one of: min_max, max_abs, standard or robust." + ) + + # TODO: Scale only the features for now, we have to investigate if we scale categorical variables + self.__matrix = scaler.fit_transform(self.__matrix) + self.__is_scaled = (True, scaler_method) + return True + + def is_scaled(self): + return self.__is_scaled[0] + + def get_scaled_method(self): + return self.__is_scaled[1] + + def is_sparse(self): + return self.__is_sparse + + def select_features_by_index(self, feature_indexes: List[int]) -> "FSDataFrame": + """ + Keep only the specified features (by index) and return an updated instance of FSDataFrame. + + :param feature_indexes: List of feature column indices to keep. + :return: A new FSDataFrame instance with only the selected features. + """ + # Filter the feature matrix to retain only the selected columns (features) + updated_matrix = self.__matrix[:, feature_indexes] + + # Filter the original feature names to retain only the selected ones + updated_features = [self.__original_features[i] for i in feature_indexes] + + # Create a new DataFrame with the retained features and their names + updated_df = pd.DataFrame(updated_matrix, columns=updated_features) + + # Reattach the sample column (if it exists) + if self.__sample_col: + updated_df[self.__sample_col] = self.__samples + + # Reattach the label column + updated_df[self.__label_col] = self.__labels + + # Return a new instance of FSDataFrame with the updated data + return FSDataFrame( + updated_df, sample_col=self.__sample_col, label_col=self.__label_col + ) + + def to_pandas(self) -> DataFrame: + """ + Return the DataFrame representation of the FSDataFrame. + :return: Pandas DataFrame. + """ + + df = pd.DataFrame() + + # Reattach the sample column (if it exists) + if self.__sample_col: + df[self.__sample_col] = self.__samples + + # Reattach the label column + df[self.__label_col] = self.__labels + + # Create a DataFrame from the feature matrix + df_features = pd.DataFrame(self.__matrix, columns=self.__original_features) + + # Concatenate the features DataFrame + df = pd.concat([df, df_features], axis=1) + + return df + + def split_df( + self, label_type_cat: bool = True, split_training_factor: float = 0.7 + ) -> Tuple["FSDataFrame", "FSDataFrame"]: + """ + Split DataFrame into training and test dataset. + It will generate a nearly class-balanced training + and testing set for both categorical and continuous label input. + + :param label_type_cat: If True (the default), the input label column will be processed as categorical. + Otherwise, it will be considered a continuous variable and binarized. + :param split_training_factor: Proportion of the training set. Usually, a value between 0.6 and 0.8. + + :return: Tuple of FSDataFrames. First element is the training set and second element is the testing set. + //TODO: To be done. + """ + + # label_col = self.get_label_col_name() + # df = self.__matrix.copy() + # + # # Create a temporary label column for sampling + # tmp_label_col = '_tmp_label_indexed' + # + # if label_type_cat: + # # Use factorize to convert categorical labels to integer indices + # df[tmp_label_col], _ = pd.factorize(df[label_col]) + # else: + # # For continuous labels, create a uniform random column and binarize it + # df['_tmp_uniform_rand'] = np.random.rand(len(df)) + # df[tmp_label_col] = (df['_tmp_uniform_rand'] > 0.5).astype(int) + # df = df.drop(columns=['_tmp_uniform_rand']) + # + # # Perform stratified sampling to get class-balanced training set + # train_df = df.groupby(tmp_label_col, group_keys=False).apply(lambda x: x.sample(frac=split_training_factor)) + # + # # Get the test set by subtracting the training set from the original DataFrame + # test_df = df.drop(train_df.index) + # + # # Drop the temporary label column + # train_df = train_df.drop(columns=[tmp_label_col]) + # test_df = test_df.drop(columns=[tmp_label_col]) + # + # # Return the updated DataFrames + # return self.update(train_df), self.update(test_df) diff --git a/fslite/fs/methods.py b/fslite/fs/methods.py new file mode 100644 index 0000000..43ba272 --- /dev/null +++ b/fslite/fs/methods.py @@ -0,0 +1,78 @@ +from abc import ABC, abstractmethod +from typing import List + +from fslite.fs.constants import get_fs_method_details +from fslite.fs.fdataframe import FSDataFrame + + +class FSMethod(ABC): + """ + Feature selection abtract class, this class defines the basic structure of a feature selection method. + From this class are derived the specific feature selection methods like FSUnivariate, + FSMultivariate and FSMLMethod. + """ + + valid_methods: List[str] = [] + + def __init__(self, fs_method, **kwargs): + """ + Initialize the feature selection method with the specified parameters. + + :param fs_method: The feature selection method to be used. + :param kwargs: Additional keyword arguments for the feature selection method. + """ + self.fs_method = fs_method + self.kwargs = kwargs + self.validate_method(fs_method) + + @abstractmethod + def validate_method(self, fs_method: str) -> bool: + """ + Abstract method to validate the feature selection method. + + :param fs_method: The feature selection method to be validated. + """ + return get_fs_method_details(fs_method) is not None + + @abstractmethod + def select_features(self, fsdf: FSDataFrame): + """ + Abstract method to select features using the feature selection method. + + :param fsdf: The data frame on which feature selection is to be performed. + """ + pass + + def get_params(self): + """ + Get the parameters for the feature selection method. + + :return: The parameters as a copy of the kwargs dictionary. + """ + return self.kwargs.copy() + + def set_params(self, **kwargs): + """ + Set the parameters for the feature selection method. + + :param kwargs: The new parameters to be set. + """ + self.kwargs.update(kwargs) + + +class InvalidMethodError(ValueError): + """ + Error raised when an invalid feature selection method is used. + """ + + def __init__(self, message): + super().__init__(f"Invalid feature selection method: {message}") + + +class InvalidDataError(ValueError): + """ + Error raised when an invalid feature selection method is used. + """ + + def __init__(self, message): + super().__init__(f"Invalid data frame: {message}") diff --git a/fslite/fs/ml.py b/fslite/fs/ml.py new file mode 100644 index 0000000..e3cb394 --- /dev/null +++ b/fslite/fs/ml.py @@ -0,0 +1,399 @@ +""" + +A set of pre-defined ML algorithms wrapped with cross-validation approach +for feature selection (e.g., rank by feature importance) and prediction. + +""" + +from typing import Union, Optional, Dict, Any, List + +from fslite.fs.constants import get_fs_ml_methods, is_valid_ml_method +from fslite.fs.fdataframe import FSDataFrame +from fslite.fs.methods import FSMethod, InvalidMethodError, InvalidDataError +from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor +from sklearn.svm import SVC, LinearSVC +from sklearn.linear_model import LogisticRegression +from sklearn.model_selection import GridSearchCV +from sklearn.metrics import accuracy_score, f1_score, roc_auc_score, make_scorer +import pandas as pd + +from fslite.fs.ml import MLCVModel + + +class FSMLMethod(FSMethod): + """ + A class for machine learning feature selection methods. + + Attributes: + fs_method (str): The machine learning method to be used for feature selection. + kwargs (dict): Additional keyword arguments for the feature selection method. + """ + + valid_methods = get_fs_ml_methods() + _ml_model: MLCVModel = None + + def __init__( + self, + fs_method: str, + rfe: bool = False, + rfe_iterations: int = 3, + percent_to_keep: float = 0.90, + **kwargs, + ): + """ + Initialize the machine learning feature selection method with the specified parameters. + + Parameters: + fs_method: The machine learning method to be used for feature selection. + kwargs: Additional keyword arguments for the feature selection method. + """ + + super().__init__(fs_method, **kwargs) + self.validate_method(fs_method) + + # set the estimator, grid and cv parameters (or none if not provided) + self.estimator_params = kwargs.get( + "estimator_params", None + ) # estimator parameters + self.evaluator_params = kwargs.get( + "evaluator_params", None + ) # evaluator parameters + self.grid_params = kwargs.get("grid_params", None) # grid parameters + self.cv_params = kwargs.get("cv_params", None) # cross-validation parameters + + # set the machine learning model + self._ml_model = self._set_ml_model() + + # parameters to control the recursive feature elimination process (rfe) + self.rfe = rfe + self.percent_to_keep = percent_to_keep + self.rfe_iterations = rfe_iterations + + # performance metrics + self.rfe_training_metric: list = ( + [] + ) # performance metrics on training for each rfe iteration + self.training_metric = None # performance metrics on training (final model) + self.testing_metric = None # performance metrics on testing (final model) + + # feature importance + self.feature_scores = None + + def validate_method(self, fs_method: str): + """ + Validate the machine learning method. + + Parameters: + fs_method: The machine learning method to be validated. + """ + + if not is_valid_ml_method(fs_method): + raise InvalidMethodError( + f"Invalid machine learning method: {fs_method}. Accepted methods are {', '.join(self.valid_methods)}" + ) + + def _set_ml_model(self): + """ + Select the machine learning model to be used for feature selection. + + Returns: + The machine learning model. + """ + + model_type = self.fs_method + + self._ml_model = MLCVModel.create_model( + model_type=model_type, + estimator_params=self.estimator_params, + evaluator_params=self.evaluator_params, + grid_params=self.grid_params, + cv_params=self.cv_params, + ) + + return self._ml_model + + def _fit_and_filter(self, df: FSDataFrame) -> FSDataFrame: + + # fit the current machine learning model + self._ml_model.fit(df) + + # get feature scores + feature_scores = self._ml_model.get_feature_scores() + + # get feature based on the (percentile) threshold provided + # expected a dataframe sorted by scores in descending order + selected_features = feature_scores.iloc[ + : int(self.percent_to_keep * len(feature_scores)) + ]["feature_index"] + + return df.select_features_by_index(selected_features, keep=True) + + def select_features(self, fsdf: FSDataFrame) -> FSDataFrame: + """ + Select features using the specified machine learning method. + + Parameters: + fsdf: The data frame on which feature selection is to be performed. + + Returns: + FSDataFrame: The data frame with selected features. + """ + + if fsdf is None or fsdf.count_features() == 0 or fsdf.count_instances() == 0: + raise InvalidDataError( + "The data frame is empty or does not contain any features." + ) + + fsdf = self._fit_and_filter(fsdf) + + # Recursive feature elimination + if self.rfe: + for iteration in range(self.rfe_iterations): + print( + f"RFE: running {iteration + 1} of {self.rfe_iterations} iterations..." + ) + fsdf = self._fit_and_filter(fsdf) + # collect the performance metrics on training for every rfe iteration + self.rfe_training_metric.append( + self._ml_model.get_eval_metric_on_training() + ) + + # get the final performance metric on training + self.training_metric = self._ml_model.get_eval_metric_on_training() + + # get the feature scores after feature selection + self.feature_scores = self._ml_model.get_feature_scores() + + return fsdf + + def get_eval_metric_name(self): + """ + Get the evaluation metric name. + + Returns: + The evaluation metric name. + """ + + if self._ml_model is None: + raise ValueError("No machine learning model is available.") + + return self._ml_model.get_eval_metric_name() + + def get_eval_metric_on_training_rfe(self): + """ + Get the evaluation metric on the training data for each RFE iteration. + + Returns: + The evaluation metric on the training data for each RFE iteration. + """ + if self.rfe_training_metric is None: + raise ValueError( + "No training metric is available. Run the select_features method first." + ) + return self.rfe_training_metric + + def get_eval_metric_on_training(self): + """ + Get the evaluation metric on the training data. + + Returns: + The evaluation metric on the training data. + """ + if self.training_metric is None: + raise ValueError( + "No training metric is available. Run the select_features method first." + ) + return self.training_metric + + def get_eval_metric_on_testing(self, fsdf: FSDataFrame): + """ + Evaluate the machine learning method on the testing data. + + Parameters: + fsdf: The testing data frame on which the machine learning method is to be evaluated. + + Returns: + The evaluation metric on the testing data. + """ + + if fsdf is None or fsdf.count_features() == 0 or fsdf.count_instances() == 0: + raise ValueError( + "The testing data frame is empty or does not contain any features." + ) + + # evaluate the model on the testing data + eval_metric = self._ml_model.get_eval_metric_on_testing(fsdf) + self.testing_metric = eval_metric + + return eval_metric + + def get_feature_scores(self): + """ + Get the feature scores after feature selection. + + Returns: + The feature scores as a pandas DataFrame. + """ + + if self.feature_scores is None: + raise ValueError( + "Feature scores are not available. Run the feature selection method first." + ) + + return self.feature_scores + + def __str__(self): + return f"FSMLMethod(method={self.fs_method}, kwargs={self.kwargs})" + + def __repr__(self): + return self.__str__() + + +class MLCVModel: + """ + A factory class for creating various machine learning models with scikit-learn. + ML models are created using a cross-validator approach for hyperparameter tuning. + """ + + _grid_search: GridSearchCV = None + _best_model: object = None + _fsdf: FSDataFrame = None + + def __init__( + self, + estimator: Union[ + RandomForestClassifier, + RandomForestRegressor, + LinearSVC, + LogisticRegression, + SVC, + ], + scoring: str, + estimator_params: Optional[Dict[str, Any]] = None, + grid_params: Optional[Dict[str, List[Any]]] = None, + cv: int = 5, + ): + """ + Initializes the MLModel with optional estimator, scoring method, and parameter specifications. + """ + self.estimator = estimator + self.scoring = scoring + self.estimator_params = estimator_params + self.grid_params = grid_params + self.cv = cv + + self._initialize_model() + + def _initialize_model(self): + if self.estimator_params: + self.estimator.set_params(**self.estimator_params) + + if self.grid_params: + self._grid_search = GridSearchCV( + estimator=self.estimator, + param_grid=self.grid_params, + scoring=self.scoring, + cv=self.cv, + ) + + def fit(self, fsdf: FSDataFrame) -> "MLCVModel": + """ + Fit the model using the cross-validator. + """ + self._fsdf = fsdf + X, y = self._fsdf.get_features_and_labels() + + if self._grid_search: + self._grid_search.fit(X, y) + self._best_model = self._grid_search.best_estimator_ + else: + self.estimator.fit(X, y) + self._best_model = self.estimator + + return self + + def _get_best_model(self): + if self._best_model is None: + raise ValueError("No model has been fitted. Use fit() to fit the model.") + return self._best_model + + def get_feature_scores(self) -> pd.DataFrame: + """ + Get feature importance scores from the best model. + """ + if not isinstance( + self._best_model, (RandomForestClassifier, RandomForestRegressor) + ): + raise ValueError( + "Feature importance is only available for tree-based models." + ) + + features = self._fsdf.get_feature_names() + importances = self._best_model.feature_importances_ + df = pd.DataFrame({"feature": features, "importance": importances}).sort_values( + by="importance", ascending=False + ) + + return df + + def get_eval_metric_on_training(self) -> float: + """ + Get the evaluation metric on training data from the best model. + """ + X_train, y_train = self._fsdf.get_features_and_labels() + y_pred = self._best_model.predict(X_train) + + if self.scoring == "accuracy": + return accuracy_score(y_train, y_pred) + elif self.scoring == "f1": + return f1_score(y_train, y_pred) + elif self.scoring == "roc_auc": + return roc_auc_score(y_train, y_pred) + else: + raise ValueError("Unsupported scoring method.") + + def get_eval_metric_on_testing(self, test_data: FSDataFrame) -> float: + """ + Get evaluation metric on test data from the trained model. + """ + X_test, y_test = test_data.get_features_and_labels() + y_pred = self._best_model.predict(X_test) + + if self.scoring == "accuracy": + return accuracy_score(y_test, y_pred) + elif self.scoring == "f1": + return f1_score(y_test, y_pred) + elif self.scoring == "roc_auc": + return roc_auc_score(y_test, y_pred) + else: + raise ValueError("Unsupported scoring method.") + + @staticmethod + def create_model( + model_type: str, + estimator_params: Dict[str, Any] = None, + grid_params: Dict[str, List[Any]] = None, + scoring: str = "accuracy", + cv: int = 5, + ) -> "MLCVModel": + """ + Create an ML model based on the model type. + """ + if model_type == "RF_BINARY": + estimator = RandomForestClassifier() + elif model_type == "LSVC_BINARY": + estimator = LinearSVC() + elif model_type == "RF_REGRESSION": + estimator = RandomForestRegressor() + elif model_type == "LOGISTIC_REGRESSION": + estimator = LogisticRegression() + else: + raise ValueError(f"Unsupported model type: {model_type}.") + + return MLCVModel( + estimator=estimator, + scoring=scoring, + estimator_params=estimator_params, + grid_params=grid_params, + cv=cv, + ) diff --git a/fslite/fs/multivariate.py b/fslite/fs/multivariate.py new file mode 100644 index 0000000..43c495d --- /dev/null +++ b/fslite/fs/multivariate.py @@ -0,0 +1,219 @@ +import logging +from typing import List + +import numpy as np +from scipy.stats import spearmanr + +from fslite.fs.constants import ( + get_fs_multivariate_methods, + is_valid_multivariate_method, +) +from fslite.fs.fdataframe import FSDataFrame +from fslite.fs.methods import FSMethod, InvalidMethodError +from fslite.fs.utils import find_maximal_independent_set, percentile_rank + +logging.basicConfig(format="%(levelname)s (%(name)s %(lineno)s): %(message)s") +logger = logging.getLogger("FS:MULTIVARIATE") +logger.setLevel(logging.INFO) + + +class FSMultivariate(FSMethod): + """ + The FSMultivariate class is a subclass of the FSMethod class and is used for multivariate + feature selection methods. It provides a way to select features using different multivariate methods such as + multivariate correlation and variance. + + Example Usage + ------------- + # Create an instance of FSMultivariate with multivariate_method='m_corr' + fs_multivariate = FSMultivariate(multivariate_method='m_corr') + + # Select features using the multivariate method + selected_features = fs_multivariate.select_features(fsdf) + """ + + valid_methods = get_fs_multivariate_methods() + + def __init__(self, fs_method: str, **kwargs): + """ + Initialize the multivariate feature selection method with the specified parameters. + + Parameters: + fsdf: The data frame on which feature selection is to be performed. + fs_method: The multivariate method to be used for feature selection. + kwargs: Additional keyword arguments for the feature selection method. + """ + + super().__init__(fs_method, **kwargs) + self.validate_method(fs_method) + + def validate_method(self, multivariate_method: str): + """ + Validate the multivariate method. + + Parameters: + multivariate_method: The multivariate method to be validated. + """ + + if not is_valid_multivariate_method(multivariate_method): + raise InvalidMethodError( + f"Invalid multivariate method: " + f"{multivariate_method}. Accepted methods are {', '.join(self.valid_methods)}" + ) + + def select_features(self, fsdf: FSDataFrame): + """ + Select features using the specified multivariate method. + """ + + return self.multivariate_filter( + fsdf, multivariate_method=self.fs_method, **self.kwargs + ) + + def multivariate_filter( + self, fsdf: FSDataFrame, multivariate_method: str = "m_corr", **kwargs + ) -> FSDataFrame: + """ + Filter features after applying a multivariate feature selector method. + + :param fsdf: Input FSDataFrame + :param multivariate_method: Multivariate selector method. + Possible values are 'm_corr' or 'variance'. + + :return: Filtered FSDataFrame + """ + if multivariate_method == "m_corr": + selected_features = multivariate_correlation_selector(fsdf, **kwargs) + elif multivariate_method == "variance": + selected_features = multivariate_variance_selector(fsdf, **kwargs) + logging.info("Variance method not implemented yet.") + else: + raise ValueError( + f"Invalid multivariate method: {multivariate_method}. " + f"Choose one of {get_fs_multivariate_methods()}." + ) + + logger.info(f"Applying multivariate filter {multivariate_method}.") + + return fsdf.select_features_by_index(selected_features) + + def __str__(self): + return f"FSMultivariate(multivariate_method={self.fs_method}, kwargs={self.kwargs})" + + def __repr__(self): + return self.__str__() + + +def multivariate_correlation_selector( + fsdf: FSDataFrame, + selection_mode: str = "strict", + selection_threshold: float = 0.75, + corr_method: str = "pearson", +) -> List[int]: + """ + Compute the correlation matrix among input features and select those below a specified threshold. + + :param fsdf: Input FSDataFrame object. + :param selection_mode: If 'strict' (default), apply hard filtering to remove highly correlated features. + Otherwise, 'approximate' find the maximal independent set of highly correlated + features (experimental). + :param selection_threshold: Minimal correlation threshold to consider two features correlated. + :param corr_method: Correlation method - 'pearson' (default) or 'spearman'. + + :return: List of selected feature indices + """ + # Retrieve the feature matrix + f_matrix = fsdf.get_feature_matrix() + + # Get features indexes from matrix + features_indexes = list(range(f_matrix.shape[1])) + + # Compute correlation matrix + if corr_method == "pearson": + corr_matrix = np.corrcoef(f_matrix, rowvar=False) + elif corr_method == "spearman": + corr_matrix, _ = spearmanr(f_matrix) + else: + raise ValueError( + f"Unsupported correlation method '{corr_method}'. Use 'pearson' or 'spearman'." + ) + + # Get absolute values of correlations to check magnitude + corr_matrix = np.abs(corr_matrix) + + # Find pairs of features with correlation above the threshold + combs_above_cutoff = np.triu(corr_matrix, k=1) > selection_threshold + correlated_pairs = np.column_stack(np.where(combs_above_cutoff)) + + # Set of indices to remove + index_to_remove = set() + if selection_mode == "strict": + # Strict filtering: remove features with higher mean correlations + col_means = np.mean(corr_matrix, axis=1) + for i, j in correlated_pairs: + if col_means[i] > col_means[j]: + index_to_remove.add(i) + else: + index_to_remove.add(j) + elif selection_mode == "approximate": + # Experimental approximate method + index_to_remove = find_maximal_independent_set(correlated_pairs, keep=False) + else: + raise ValueError( + f"Unsupported selection mode '{selection_mode}'. Use 'strict' or 'approximate'." + ) + + # Select feature index to keep + mask = np.ones(len(features_indexes), dtype=bool) + mask[list(index_to_remove)] = False + selected_features = np.array(features_indexes)[mask] + + return selected_features + + +def multivariate_variance_selector( + fsdf: FSDataFrame, selection_mode: str = "k_best", selection_threshold: float = 0.0 +) -> List[int]: + """ + Filter features based on variance threshold. + + :param selection_mode: "percentile" or "k_best" (default). If "percentile", the threshold is a percentile of the + variance distribution. If "k_best", the threshold is the k highest variances. + Default is "k_best" with selection_threshold=0.0 (i.e. remove features with same values + in all samples). + :param fsdf: Input FSDataFrame object. + :param selection_threshold: Minimal variance threshold to keep a feature. + Default is 0.0 (i.e. remove features with same values in all samples). + + :return: List of selected feature indices + """ + + # Retrieve the feature matrix + f_matrix = fsdf.get_feature_matrix() + + # Compute variances (across samples) for each feature + features_variances = np.var(f_matrix, axis=0) + + # print to log variance mean across features + logger.info(f"Mean variance across features: {np.mean(features_variances)}") + print(f"Mean variance across features: {np.mean(features_variances)}") + + if selection_mode == "k_best": + # keep indices of features with variance above the threshold + selected_features = np.where(features_variances > selection_threshold)[0] + elif selection_mode == "percentile": + # compute the percentile rank of variances + variances_pct_rank = percentile_rank(features_variances) + # keep indices of features with variance above the threshold + selected_features = np.where(variances_pct_rank > selection_threshold)[0] + else: + raise ValueError( + f"Unsupported selection mode '{selection_mode}'. Use 'percentile' or 'k_best'." + ) + + logger.info( + f"Feature selection mode: {selection_mode}. \n" + f"Number of features selected: {len(selected_features)}" + ) + + return list(selected_features) diff --git a/fslite/fs/univariate.py b/fslite/fs/univariate.py new file mode 100644 index 0000000..b7ba4be --- /dev/null +++ b/fslite/fs/univariate.py @@ -0,0 +1,217 @@ +import logging +from typing import Dict, List + +import numpy as np +from sklearn.feature_selection import ( + GenericUnivariateSelect, + f_classif, + f_regression, + mutual_info_classif, + mutual_info_regression, +) + +from fslite.fs.constants import get_fs_univariate_methods, is_valid_univariate_method +from fslite.fs.fdataframe import FSDataFrame +from fslite.fs.methods import FSMethod, InvalidMethodError + +logging.basicConfig(format="%(levelname)s (%(name)s %(lineno)s): %(message)s") +logger = logging.getLogger("FS:UNIVARIATE") +logger.setLevel(logging.INFO) + + +class FSUnivariate(FSMethod): + """ + A class for univariate feature selection methods. + """ + + valid_methods = get_fs_univariate_methods() + + def __init__(self, fs_method: str, **kwargs): + """ + Initialize the univariate feature selection method with the specified parameters. + + :param fs_method: The univariate method to be used for feature selection. + :param kwargs: Additional keyword arguments for the feature selection method. + """ + super().__init__(fs_method, **kwargs) + self.validate_method(fs_method) + + def validate_method(self, fs_method: str): + """ + Validate the univariate method. + + :param fs_method: The univariate method to be validated. + """ + + if not is_valid_univariate_method(fs_method): + raise InvalidMethodError( + f"Invalid univariate method: {fs_method}. " + f"Accepted methods are {', '.join(self.valid_methods)}" + ) + + def select_features(self, fsdf) -> FSDataFrame: + """ + Select features using the specified univariate method. + + :param fsdf: The data frame on which feature selection is to be performed. + :return fsdf: The data frame with selected features. + """ + + return self.univariate_filter( + fsdf, univariate_method=self.fs_method, **self.kwargs + ) + + def __str__(self): + return f"FSUnivariate(method={self.fs_method}, kwargs={self.kwargs})" + + def __repr__(self): + return self.__str__() + + def univariate_feature_selector( + self, + df: FSDataFrame, + score_func: str = "f_classif", + selection_mode: str = "percentile", + selection_threshold: float = 0.8, + ) -> List[int]: + """ + Wrapper for scikit-learn's `GenericUnivariateSelect` feature selector, supporting multiple scoring functions. + + :param df: Input FSDataFrame + :param score_func: The score function to use for feature selection. Options are: + - 'f_classif': ANOVA F-value for classification tasks. + - 'f_regression': F-value for regression tasks. + - 'mutual_info_classif': Mutual information for classification. + - 'mutual_info_regression': Mutual information for regression. + :param selection_mode: Mode for feature selection (e.g. 'percentile' or 'k_best'). + :param selection_threshold: The percentage or number of features to select based on the selection mode. + + :return: List of selected feature indices. + """ + # Define the score function based on input + score_func_mapping = { + "f_classif": f_classif, + "f_regression": f_regression, + "mutual_info_classif": mutual_info_classif, + "mutual_info_regression": mutual_info_regression, + } + + if score_func not in score_func_mapping: + raise ValueError( + f"Invalid score_func '{score_func}'. Valid options are: {list(score_func_mapping.keys())}" + ) + + # Extract the score function + selected_score_func = score_func_mapping[score_func] + + # Get the feature matrix and label vector from the FSDataFrame + f_matrix = df.get_feature_matrix() + y = df.get_label_vector() + + # Configure the selector using the provided score function and selection mode/threshold + selector = GenericUnivariateSelect( + score_func=selected_score_func, + mode=selection_mode, + param=selection_threshold, + ) + + # Fit the selector and get only the selected feature indices (not the transformed matrix) + _ = selector.fit_transform(f_matrix, y) + selected_features = selector.get_support(indices=True) + + return list(selected_features) + + def univariate_filter( + self, df: FSDataFrame, univariate_method: str = "u_corr", **kwargs + ) -> FSDataFrame: + """ + Filter features after applying a univariate feature selector method. + + :param df: Input DataFrame + :param univariate_method: Univariate selector method ('u_corr', 'anova', 'f_regression', + 'mutual_info_classification', 'mutual_info_regression') + :return: Filtered DataFrame with selected features + """ + + if not is_valid_univariate_method(univariate_method): + raise NotImplementedError( + "The provided method {} is not implemented !! please select one from this list {}".format( + univariate_method, get_fs_univariate_methods() + ) + ) + + selected_features = [] + + if univariate_method == "anova": + selected_features = self.univariate_feature_selector( + df, score_func="f_classif", **kwargs + ) + elif univariate_method == "f_regression": + selected_features = self.univariate_feature_selector( + df, score_func="f_regression", **kwargs + ) + elif univariate_method == "u_corr": + selected_features = univariate_correlation_selector(df, **kwargs) + elif univariate_method == "mutual_info_classification": + selected_features = self.univariate_feature_selector( + df, score_func="mutual_info_classif", **kwargs + ) + elif univariate_method == "mutual_info_regression": + selected_features = self.univariate_feature_selector( + df, score_func="mutual_info_regression", **kwargs + ) + + logger.info( + f"Applying univariate filter using method: {univariate_method} \n" + f" with selection mode: {kwargs.get('selection_mode')} \n" + f" and selection threshold: {kwargs.get('selection_threshold')}" + ) + + if len(selected_features) == 0: + logger.warning("No features selected. Returning original DataFrame.") + return df + else: + logger.info(f"Selected {len(selected_features)} features...") + return df.select_features_by_index(selected_features) + + +def univariate_correlation_selector( + df: FSDataFrame, selection_threshold: float = 0.3 +) -> List[int]: + """ + TODO: Replace this implementation with sci-learn's GenericUnivariateSelect with score_func='f_regression' + Select features based on their correlation with a label (class), if the correlation value is less than the specified + threshold. + + :param df: Input DataFrame + :param selection_threshold: Maximum allowed correlation threshold + + :return: List of selected feature indices + """ + + def compute_univariate_corr(df: FSDataFrame) -> Dict[int, float]: + """ + Compute the correlation coefficient between every column (features) in the input NumPy array and the label (class) + using a dictionary comprehension. + + :param df: Input FSDataFrame + :return: Return dict {feature_index -> corr} + """ + + f_matrix = df.get_feature_matrix() # get the feature matrix + labels = df.get_label_vector() # get the label vector + features_index = range(f_matrix.shape[1]) # get the feature index + + return { + f_index: abs(np.corrcoef(f_matrix[:, f_index], labels)[0, 1]) + for f_index in features_index + } + + correlations = compute_univariate_corr(df) + + selected_features = [ + feature_index + for feature_index, corr in correlations.items() + if corr <= selection_threshold + ] + return selected_features diff --git a/fsspark/fs/utils.py b/fslite/fs/utils.py similarity index 76% rename from fsspark/fs/utils.py rename to fslite/fs/utils.py index 31d4c39..a36a089 100644 --- a/fsspark/fs/utils.py +++ b/fslite/fs/utils.py @@ -2,12 +2,13 @@ from typing import Dict, Tuple, Set import networkx as nx -import pyspark.sql.functions as f +import numpy as np from networkx.algorithms.mis import maximal_independent_set -from pyspark.ml.feature import Imputer +from scipy.stats import rankdata +from sklearn.impute import SimpleImputer -from fsspark.fs.core import FSDataFrame -from fsspark.utils.generic import tag +from fslite.fs.fdataframe import FSDataFrame +from fslite.utils.generic import tag logging.basicConfig(format="%(levelname)s (%(name)s %(lineno)s): %(message)s") logger = logging.getLogger("FSSPARK:UTILS") @@ -27,20 +28,21 @@ def compute_missingness_rate(fsdf: FSDataFrame) -> Dict[str, float]: n_instances = fsdf.count_instances() # number of instances/samples. features = fsdf.get_features_names() # list of features (column) names - missing_rates = sdf.select( - [ - ( - f.sum(f.when(f.isnan(sdf[c]) | f.isnull(sdf[c]), 1).otherwise(0)) / n_instances - ).alias(c) - for c in features - ] - ) + # missing_rates = sdf.select( + # [ + # ( + # f.sum(f.when(f.isnan(sdf[c]) | f.isnull(sdf[c]), 1).otherwise(0)) + # / n_instances + # ).alias(c) + # for c in features + # ] + # ) - return missing_rates.first().asDict() + # return missing_rates.first().asDict() def remove_features_by_missingness_rate( - fsdf: FSDataFrame, threshold: float = 0.15 + fsdf: FSDataFrame, threshold: float = 0.15 ) -> FSDataFrame: """ Remove features from FSDataFrame with missingness rate higher or equal than a specified threshold. @@ -78,7 +80,7 @@ def impute_missing(fsdf: FSDataFrame, strategy: str = "mean") -> FSDataFrame: col_features = fsdf.get_features_names() sdf_imputed = ( - Imputer() + SimpleImputer() .setStrategy(strategy) .setInputCols(col_features) .setOutputCols(col_features) @@ -107,7 +109,9 @@ def find_maximal_independent_set(pairs: Tuple[int], keep: bool = True) -> Set[in :return: Set of indices (maximal independent set or remaining indices). """ - logger.warning("This method is experimental and have been not extensively tested...") + logger.warning( + "This method is experimental and have been not extensively tested..." + ) graph = nx.Graph() graph.add_edges_from(pairs) @@ -116,3 +120,16 @@ def find_maximal_independent_set(pairs: Tuple[int], keep: bool = True) -> Set[in return set(max_ind_set) else: return set([int(i) for i in graph.nodes if i not in max_ind_set]) + + +# define a function to convert a numerical vector to percentile ranks + + +def percentile_rank(vector: np.array) -> np.array: + """ + Convert a numerical vector to percentile ranks. + + :param vector: Numerical vector. + :return: Vector of percentile ranks. + """ + return np.percentile(vector, np.linspace(0, 100, len(vector))) diff --git a/fslite/pipeline/fs_pipeline_example.py b/fslite/pipeline/fs_pipeline_example.py new file mode 100644 index 0000000..96c1378 --- /dev/null +++ b/fslite/pipeline/fs_pipeline_example.py @@ -0,0 +1,69 @@ +# """ +# Example of a feature selection pipeline implemented in fslite. +# +# After data import and pre-processing, the pipeline applies univariate correlation filter, +# multivariate correlation filter and Randon Forest classification. +# +# """ +# +# from fslite.config.context import init_spark, stop_spark_session +# from fslite.fs.core import FSDataFrame +# +# from fslite.fs.methods import FSPipeline, FSUnivariate, FSMultivariate, FSMLMethod +# from fslite.utils.datasets import get_tnbc_data_path +# from fslite.utils.io import import_table_as_psdf +# +# # Init spark +# init_spark( +# apply_pyarrow_settings=True, +# apply_extra_spark_settings=True, +# apply_pandas_settings=True, +# ) +# +# # 1. Import data +# df = import_table_as_psdf(get_tnbc_data_path(), n_partitions=5) +# fsdf = FSDataFrame(df, sample_col="Sample", label_col="label") +# +# # 2. Split data +# training_data, testing_data = fsdf.split_df(split_training_factor=0.6) +# +# # 3. Set feature selection methods +# # create a Univariate object +# univariate = FSUnivariate( +# fs_method="anova", selection_mode="percentile", selection_threshold=0.8 +# ) +# +# # create a Multivariate object +# multivariate = FSMultivariate( +# fs_method="m_corr", corr_threshold=0.75, corr_method="pearson" +# ) +# +# # create a MLMethod object +# rf_classifier = FSMLMethod( +# fs_method="rf_multilabel", +# rfe=True, +# rfe_iterations=2, +# percent_to_keep=0.9, +# estimator_params={"labelCol": "label"}, +# evaluator_params={"metricName": "accuracy"}, +# grid_params={"numTrees": [10, 15], "maxDepth": [5, 10]}, +# cv_params={"parallelism": 2, "numFolds": 5}, +# ) +# +# # 4. Create a pipeline object +# fs_pipeline = FSPipeline( +# df_training=training_data, +# df_testing=testing_data, +# fs_stages=[univariate, multivariate, rf_classifier], +# ) +# +# # 5. Run the pipeline +# results = fs_pipeline.run() +# +# # Print results +# print(f"Accuracy on training: {results['training_metric']}") +# print(f"Accuracy on testing: {results['testing_metric']}") +# print(results.get("feature_scores")) +# +# +# stop_spark_session() diff --git a/fsspark/testdata/TNBC.tsv.gz b/fslite/testdata/TNBC.tsv.gz similarity index 100% rename from fsspark/testdata/TNBC.tsv.gz rename to fslite/testdata/TNBC.tsv.gz diff --git a/fsspark/testdata/TNBC_missing.tsv b/fslite/testdata/TNBC_missing.tsv similarity index 100% rename from fsspark/testdata/TNBC_missing.tsv rename to fslite/testdata/TNBC_missing.tsv diff --git a/fsspark/tests/__init__.py b/fslite/tests/__init__.py similarity index 100% rename from fsspark/tests/__init__.py rename to fslite/tests/__init__.py diff --git a/fslite/tests/generate_big_tests.py b/fslite/tests/generate_big_tests.py new file mode 100644 index 0000000..0efc849 --- /dev/null +++ b/fslite/tests/generate_big_tests.py @@ -0,0 +1,56 @@ +import logging + +import numpy as np +import pandas as pd +import pyarrow as pa +import pyarrow.parquet as pq + + +def generate_large_test_dataset(): + # Parameters for the dataset + n_samples = 1200 + n_features = 10_000 + chunk_size = 100 # Adjust chunk size for memory efficiency + + # Generate sample IDs and labels + sample_ids = np.arange(1, n_samples + 1) + labels = np.random.choice(["LV", "RV", "LA", "RA"], size=n_samples) + + # Parquet schema definition + schema = pa.schema( + [pa.field("sample_id", pa.int32()), pa.field("label", pa.string())] + + [pa.field(f"feature{i}", pa.float32()) for i in range(1, n_features + 1)] + ) + + # Create an empty Parquet file + output_file = "large_dataset_optimized_samples_{}_features_{}.parquet".format( + n_samples, n_features + ) + with pq.ParquetWriter(output_file, schema, compression="snappy") as writer: + # Process in chunks to reduce memory usage + for chunk_start in range(0, n_samples, chunk_size): + chunk_end = min(chunk_start + chunk_size, n_samples) + + # Generate chunk of samples and labels + chunk_sample_ids = sample_ids[chunk_start:chunk_end] + chunk_labels = labels[chunk_start:chunk_end] + + # Generate chunk of features + rng = np.random.default_rng() + chunk_features = { + f"feature{i}": rng.random(chunk_end - chunk_start) + for i in range(1, n_features + 1) + } + + # Create DataFrame chunk + chunk_data = {"sample_id": chunk_sample_ids, "label": chunk_labels} + chunk_data.update(chunk_features) + + df_chunk = pd.DataFrame(chunk_data) + + # Convert to PyArrow Table and write chunk to Parquet file + table_chunk = pa.Table.from_pandas(df_chunk, schema=schema) + writer.write_table(table_chunk) + logging.info(f"Processed samples {chunk_start + 1} to {chunk_end}") + + print("Optimized Parquet file created successfully!") diff --git a/fslite/tests/test_fsdataframe.py b/fslite/tests/test_fsdataframe.py new file mode 100644 index 0000000..cda8bce --- /dev/null +++ b/fslite/tests/test_fsdataframe.py @@ -0,0 +1,121 @@ +import gc + +import matplotlib.pyplot as plt +import numpy as np +import pandas as pd +from memory_profiler import memory_usage + +from fslite.fs.fdataframe import FSDataFrame + + +def test_initializes_fsdataframe(): + # Create a sample DataFrame + data = { + "sample_id": [1, 2, 3], + "label": ["A", "B", "C"], + "feature1": [0.1, 0.2, 0.3], + "feature2": [1.1, 1.2, 1.3], + } + df = pd.DataFrame(data) + + # Initialize FSDataFrame + fs_df = FSDataFrame(df=df, sample_col="sample_id", label_col="label") + + # Assertions to check if the initialization is correct + assert isinstance(fs_df, FSDataFrame) + + assert fs_df.get_sample_col_name() == "sample_id" + + +def test_scaler_df(): + # Create a sample DataFrame + data = { + "sample_id": [1, 2, 3], + "label": ["A", "B", "C"], + "feature1": [0.1, 0.2, 0.3], + "feature2": [1.1, 1.2, 1.3], + } + df = pd.DataFrame(data) + + # Initialize FSDataFrame + fs_df = FSDataFrame(df=df, sample_col="sample_id", label_col="label") + + # Scale the DataFrame + fs_df.scale_features(scaler_method="standard") + + # Assertions to check if the scaling is correct + assert fs_df.is_scaled() == True + assert fs_df.get_scaled_method() == "standard" + + +def test_memory_fsdataframe(): + def create_test_data( + n_samples: int, n_features: int, zero_prob: float = 0.1, nan_prob: float = 0.05 + ): + """Create test data for FSDataFrame.""" + data = np.random.rand(n_samples, n_features) + data[np.random.rand(n_samples, n_features) < zero_prob] = 0 + data[np.random.rand(n_samples, n_features) < nan_prob] = np.nan + + df = pd.DataFrame(data, columns=[f"feature_{i}" for i in range(n_features)]) + df["sample_id"] = [f"sample_{i}" for i in range(n_samples)] + df["label"] = np.random.choice(["A", "B"], n_samples) + return df + + def measure_memory_usage(n_samples: int, n_features: int, nan_prob=0.01) -> float: + """Measure memory usage of FSDataFrame for given number of samples and features.""" + df = create_test_data(n_samples, n_features, nan_prob=nan_prob) + mem_usage = memory_usage( + (FSDataFrame, (df, "sample_id", "label")), max_iterations=1 + )[0] + gc.collect() # Force garbage collection to free memory + return mem_usage + + # Define test cases + feature_sizes = [1000, 5000, 10000] + sample_sizes = [100, 500, 1000] + nan_prob = [0.05, 0.1, 0.2, 0.5] + + # Measure memory usage for each test case + results = [] + for n_samples in sample_sizes: + for n_features in feature_sizes: + for prob in nan_prob: + mem_usage = measure_memory_usage(n_samples, n_features, nan_prob=prob) + results.append( + (n_samples, n_features, prob, mem_usage) + ) # Append prob to results + + # Convert results to DataFrame + results_df = pd.DataFrame( + results, columns=["Samples", "Features", "NAN Prob", "Memory (MB)"] + ) + + # Create 2D line plot + plt.figure(figsize=(12, 8)) + + for feature_size in feature_sizes: + for prob in nan_prob: + data = results_df[ + (results_df["Features"] == feature_size) + & (results_df["NAN Prob"] == prob) + ] + plt.plot( + data["Samples"], + data["Memory (MB)"], + marker="o", + label=f"{feature_size} Features - {prob} NAN Prob", + ) + + plt.xlabel("Number of Samples") + plt.ylabel("Memory Usage (MB)") + plt.title("FSDataFrame Memory Usage") + plt.legend() + plt.xscale("log") # Using log scale for x-axis to better visualize the range + plt.tight_layout() + plt.show() + + # Print results table + print(results_df.to_string(index=False)) + + # Initialize FSDataFrame with DataFrame having sparse numerical features and insufficient memory for dense matrix diff --git a/fslite/tests/test_multivariate_methods.py b/fslite/tests/test_multivariate_methods.py new file mode 100644 index 0000000..ba0f52a --- /dev/null +++ b/fslite/tests/test_multivariate_methods.py @@ -0,0 +1,122 @@ +import pandas as pd + +from fslite.fs.fdataframe import FSDataFrame +from fslite.fs.multivariate import FSMultivariate +from fslite.utils.datasets import get_tnbc_data_path + + +# test multivariate_filter method with 'm_corr' method +def test_multivariate_filter_corr_strict_mode(): + """ + Test multivariate_filter method with 'm_corr' method. + :return: None + """ + + # import tsv as pandas DataFrame + df = pd.read_csv(get_tnbc_data_path(), sep="\t") + + # create FSDataFrame instance + fs_df = FSDataFrame(df=df, sample_col="Sample", label_col="label") + + # create FSMultivariate instance + fs_multivariate = FSMultivariate( + fs_method="m_corr", selection_mode="strict", selection_threshold=0.75 + ) + + fsdf_filtered = fs_multivariate.select_features(fs_df) + + assert fs_df.count_features() == 500 + assert fsdf_filtered.count_features() == 239 + + # Export the filtered DataFrame as Pandas DataFrame + df_filtered = fsdf_filtered.to_pandas() + df_filtered.to_csv("filtered_tnbc_data.csv", index=False) + + +# test multivariate_filter method with 'm_corr' method in approximate mode +def test_multivariate_filter_corr_approximate_mode(): + """ + Test multivariate_filter method with 'm_corr' method in approximate mode. + :return: None + """ + + # import tsv as pandas DataFrame + df = pd.read_csv(get_tnbc_data_path(), sep="\t") + + # create FSDataFrame instance + fs_df = FSDataFrame(df=df, sample_col="Sample", label_col="label") + + # create FSMultivariate instance + fs_multivariate = FSMultivariate( + fs_method="m_corr", selection_mode="approximate", selection_threshold=0.75 + ) + + fsdf_filtered = fs_multivariate.select_features(fs_df) + + assert fs_df.count_features() == 500 + + # test if number of features selected is within the expected range [240-260] + assert 240 <= fsdf_filtered.count_features() <= 260 + + # Export the filtered DataFrame as Pandas DataFrame + df_filtered = fsdf_filtered.to_pandas() + df_filtered.to_csv("filtered_tnbc_data.csv", index=False) + + +# test multivariate_filter method with 'variance' method +def test_multivariate_filter_variance_percentile_mode(): + """ + Test multivariate_filter method with 'variance' method. + :return: None + """ + + # import tsv as pandas DataFrame + df = pd.read_csv(get_tnbc_data_path(), sep="\t") + + # create FSDataFrame instance + fs_df = FSDataFrame(df=df, sample_col="Sample", label_col="label") + + # create FSMultivariate instance + fs_multivariate = FSMultivariate( + fs_method="variance", selection_mode="percentile", selection_threshold=0.2 + ) + + fsdf_filtered = fs_multivariate.select_features(fs_df) + + assert fs_df.count_features() == 500 + assert fsdf_filtered.count_features() == 500 + + # Export the filtered DataFrame as Pandas DataFrame + df_filtered = fsdf_filtered.to_pandas() + df_filtered.to_csv("filtered_tnbc_data.csv", index=False) + + +# test multivariate_filter method with 'variance' method in k_best mode +def test_multivariate_filter_variance_k_best_mode(): + """ + Test multivariate_filter method with 'variance' method in k_best mode. + :return: None + """ + + # import tsv as pandas DataFrame + df = pd.read_csv(get_tnbc_data_path(), sep="\t") + + # create FSDataFrame instance + fs_df = FSDataFrame(df=df, sample_col="Sample", label_col="label") + + # create FSMultivariate instance + fs_multivariate = FSMultivariate( + fs_method="variance", + selection_mode="k_best", + selection_threshold=68100000.0, + # TODO: check this value (should be normalized variance?) + ) + + fsdf_filtered = fs_multivariate.select_features(fs_df) + + assert fs_df.count_features() == 500 + assert fsdf_filtered.count_features() == 87 + + # Export the filtered DataFrame as Pandas DataFrame + df_filtered = fsdf_filtered.to_pandas() + df_filtered.to_csv("filtered_tnbc_data.csv", index=False) diff --git a/fslite/tests/test_univariate_methods.py b/fslite/tests/test_univariate_methods.py new file mode 100644 index 0000000..278393f --- /dev/null +++ b/fslite/tests/test_univariate_methods.py @@ -0,0 +1,171 @@ +import pandas as pd +import psutil + +from fslite.fs.fdataframe import FSDataFrame +from fslite.fs.univariate import FSUnivariate +from fslite.utils.datasets import get_tnbc_data_path + + +def test_univariate_filter_corr(): + """ + Test univariate_filter method with 'u_corr' method. + :return: None + """ + + # import tsv as pandas DataFrame + df = pd.read_csv(get_tnbc_data_path(), sep="\t") + + # create FSDataFrame instance + fs_df = FSDataFrame(df=df, sample_col="Sample", label_col="label") + + # create FSUnivariate instance + fs_univariate = FSUnivariate(fs_method="u_corr", selection_threshold=0.3) + + fsdf_filtered = fs_univariate.select_features(fs_df) + + assert fs_df.count_features() == 500 + assert fsdf_filtered.count_features() == 211 + + # Export the filtered DataFrame as Pandas DataFrame + df_filtered = fsdf_filtered.to_pandas() + df_filtered.to_csv("filtered_tnbc_data.csv", index=False) + +def test_univariate_filter_big_corr(): + # import tsv as pandas DataFrame + df = pd.read_parquet(path="../../examples/GSE156793.parquet") + df.drop(columns=["development_day", "assay_id"], inplace=True) + print(df.shape[1]) + + dense_matrix_size = (df.memory_usage(deep=True).sum() / 1e+6) # In megabytes + available_memory = (psutil.virtual_memory().available / 1e+6) # In megabytes + + # create FSDataFrame instance + fs_df = FSDataFrame(df=df, sample_col="sample_id", label_col="cell_cluster_id") + + # create FSUnivariate instance + fs_univariate = FSUnivariate(fs_method="u_corr", selection_threshold=0.3) + + fsdf_filtered = fs_univariate.select_features(fs_df) + + assert fs_df.count_features() == 500 + assert fsdf_filtered.count_features() == 211 + + # Export the filtered DataFrame as Pandas DataFrame + df_filtered = fsdf_filtered.to_pandas() + df_filtered.to_csv("single_cell_output.csv", index=False) + + +# test the univariate_filter method with 'anova' method +def test_univariate_filter_anova(): + """ + Test univariate_filter method with 'anova' method. + :return: None + """ + + # import tsv as pandas DataFrame + df = pd.read_csv(get_tnbc_data_path(), sep="\t") + + # create FSDataFrame instance + fs_df = FSDataFrame(df=df, sample_col="Sample", label_col="label") + + # create FSUnivariate instance + fs_univariate = FSUnivariate( + fs_method="anova", selection_mode="percentile", selection_threshold=0.8 + ) + + fsdf_filtered = fs_univariate.select_features(fs_df) + + assert fs_df.count_features() == 500 + assert fsdf_filtered.count_features() == 4 + + # Export the filtered DataFrame as Pandas DataFrame + df_filtered = fsdf_filtered.to_pandas() + df_filtered.to_csv("filtered_tnbc_data.csv", index=False) + + +# test the univariate_filter method with 'mutual_info_classification' method +def test_univariate_filter_mutual_info_classification(): + """ + Test univariate_filter method with 'mutual_info_classification' method. + :return: None + """ + + # import tsv as pandas DataFrame + df = pd.read_csv(get_tnbc_data_path(), sep="\t") + + # create FSDataFrame instance + fs_df = FSDataFrame(df=df, sample_col="Sample", label_col="label") + + # create FSUnivariate instance + fs_univariate = FSUnivariate( + fs_method="mutual_info_classification", + selection_mode="k_best", + selection_threshold=30, + ) + + fsdf_filtered = fs_univariate.select_features(fs_df) + + assert fs_df.count_features() == 500 + assert fsdf_filtered.count_features() == 30 + + # Export the filtered DataFrame as Pandas DataFrame + df_filtered = fsdf_filtered.to_pandas() + df_filtered.to_csv("filtered_tnbc_data.csv", index=False) + + +# test the univariate_filter method with 'mutual_info_regression' method +def test_univariate_filter_mutual_info_regression(): + """ + Test univariate_filter method with 'mutual_info_regression' method. + :return: None + """ + + # import tsv as pandas DataFrame + df = pd.read_csv(get_tnbc_data_path(), sep="\t") + + # create FSDataFrame instance + fs_df = FSDataFrame(df=df, sample_col="Sample", label_col="label") + + # create FSUnivariate instance + fs_univariate = FSUnivariate( + fs_method="mutual_info_regression", + selection_mode="percentile", + selection_threshold=0.8, + ) + + fsdf_filtered = fs_univariate.select_features(fs_df) + + assert fs_df.count_features() == 500 + assert fsdf_filtered.count_features() == 4 + + # Export the filtered DataFrame as Pandas DataFrame + df_filtered = fsdf_filtered.to_pandas() + df_filtered.to_csv("filtered_tnbc_data.csv", index=False) + + +# test the univariate_filter method with f-regression method +def test_univariate_filter_f_regression(): + """ + Test univariate_filter method with f_regression method. + :return: None + """ + + # import tsv as pandas DataFrame + df = pd.read_csv(get_tnbc_data_path(), sep="\t") + + # create FSDataFrame instance + fs_df = FSDataFrame(df=df, sample_col="Sample", label_col="label") + + # create FSUnivariate instance + fs_univariate = FSUnivariate( + fs_method="f_regression", selection_mode="percentile", selection_threshold=0.8 + ) + + fsdf_filtered = fs_univariate.select_features(fs_df) + + assert fs_df.count_features() == 500 + assert fsdf_filtered.count_features() == 4 + + # Export the filtered DataFrame as Pandas DataFrame + df_filtered = fsdf_filtered.to_pandas() + df_filtered.to_csv("filtered_tnbc_data.csv", index=False) diff --git a/fsspark/fs/__init__.py b/fslite/utils/__init__.py similarity index 100% rename from fsspark/fs/__init__.py rename to fslite/utils/__init__.py diff --git a/fsspark/utils/datasets.py b/fslite/utils/datasets.py similarity index 100% rename from fsspark/utils/datasets.py rename to fslite/utils/datasets.py diff --git a/fsspark/utils/generic.py b/fslite/utils/generic.py similarity index 99% rename from fsspark/utils/generic.py rename to fslite/utils/generic.py index 5674998..9ab70e8 100644 --- a/fsspark/utils/generic.py +++ b/fslite/utils/generic.py @@ -11,10 +11,12 @@ def tag(label: str): :param label: tag label :return: decorator """ + def decorator(func): def wrapper(*args, **kwargs): print(f"Tag for {func.__name__}: {label}") return func(*args, **kwargs) return wrapper + return decorator diff --git a/fslite/utils/io.py b/fslite/utils/io.py new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/fslite/utils/io.py @@ -0,0 +1 @@ + diff --git a/fsspark/config/context.py b/fsspark/config/context.py deleted file mode 100644 index 2cc5a50..0000000 --- a/fsspark/config/context.py +++ /dev/null @@ -1,59 +0,0 @@ -import os -import pyspark -from pyspark.sql import SparkSession - -from fsspark.config.global_settings import (SPARK_EXTRA_SETTINGS, - PYARROW_SETTINGS, - PANDAS_ON_SPARK_API_SETTINGS) - -os.environ['PYARROW_IGNORE_TIMEZONE'] = "1" - - -# os.environ['JAVA_HOME'] = "/Library/Java/JavaVirtualMachines/jdk1.8.0_162.jdk/Contents/Home" -# os.environ['SPARK_HOME'] = "/usr/local/spark-3.3.0-bin-hadoop3" - -def init_spark(master: str = "local[8]", - apply_pyarrow_settings: bool = True, - apply_extra_spark_settings: bool = True, - apply_pandas_settings: bool = True) -> SparkSession: - """ - Init Spark session. - - :return: Spark session - """ - # stop any current session before starting a new one. - # stop_spark_session() - - # init or get spark session. - spark = (SparkSession.builder - .master(master) - .appName("fs-spark") - ) - - if apply_extra_spark_settings: - # Spark must be configured before starting context. - for k in SPARK_EXTRA_SETTINGS.keys(): - spark = spark.config(k, SPARK_EXTRA_SETTINGS.get(k)) - spark = spark.getOrCreate() - else: - spark = spark.getOrCreate() - - if apply_pyarrow_settings: - [spark.conf.set(k, PYARROW_SETTINGS.get(k)) for k in PYARROW_SETTINGS.keys()] - if apply_pandas_settings: - [spark.conf.set(k, PANDAS_ON_SPARK_API_SETTINGS.get(k)) for k in PANDAS_ON_SPARK_API_SETTINGS.keys()] - - return spark - - -def stop_spark_session() -> None: - """ - If any, stop active Spark Session. - - :return: None - """ - sc = pyspark.sql.SparkSession.getActiveSession() - if sc is not None: - sc.stop() - else: - return None diff --git a/fsspark/config/global_settings.py b/fsspark/config/global_settings.py deleted file mode 100644 index 53675ee..0000000 --- a/fsspark/config/global_settings.py +++ /dev/null @@ -1,22 +0,0 @@ -# Description: Global settings for the fsspark package. -# These settings provide a way to configure the spark session and the spark context to run the fsspark package locally. - -# spark settings to test this module locally. -SPARK_EXTRA_SETTINGS = {'spark.executor.memory': '8g', - 'spark.driver.memory': '16g', - "spark.memory.offHeap.enabled": 'true', - "spark.memory.offHeap.size": '4g', - "spark.sql.pivotMaxValues": '100000', - "spark.network.timeout": '100000', - "spark.sql.session.timeZone": "UTC" - } - -# pyarrow settings to make available columnar data processing -PYARROW_SETTINGS = {"spark.sql.execution.arrow.pyspark.enabled": "true", - "spark.sql.execution.arrow.pyspark.fallback.enabled": "true" - } - -# setting for pandas api on spark (PoS) -PANDAS_ON_SPARK_API_SETTINGS = {"compute.default_index_type": "distributed", - "compute.ordered_head": False, - "display.max_rows": 100} diff --git a/fsspark/fs/constants.py b/fsspark/fs/constants.py deleted file mode 100644 index 27eb6e3..0000000 --- a/fsspark/fs/constants.py +++ /dev/null @@ -1,53 +0,0 @@ -# Define constants for the project - - -# Define univariate feature selection methods constants -ANOVA = 'anova' -UNIVARIATE_CORRELATION = 'u_corr' -F_REGRESSION = 'f_regression' - -# Define dict with univariate feature selection methods and brief description -UNIVARIATE_METHODS = { - ANOVA: 'ANOVA univariate feature selection (F-classification)', - UNIVARIATE_CORRELATION: 'Univariate Correlation', - F_REGRESSION: 'Univariate F-regression' -} - -# Define multivariate feature selection methods constants -MULTIVARIATE_CORRELATION = 'm_corr' -MULTIVARIATE_VARIANCE = 'variance' - -# Define dict with multivariate feature selection methods and brief description -MULTIVARIATE_METHODS = { - MULTIVARIATE_CORRELATION: 'Multivariate Correlation', - MULTIVARIATE_VARIANCE: 'Multivariate Variance' -} - -# Define machine learning wrapper methods constants - -# binary classification -RF_BINARY = 'rf_binary' -LSVC_BINARY = 'lsvc_binary' -FM_BINARY = 'fm_binary' # TODO: implement this method - -# multilabel classification -RF_MULTILABEL = 'rf_multilabel' -LR_MULTILABEL = 'lg_multilabel' # TODO: implement this method - -# regression -RF_REGRESSION = 'rf_regression' -FM_REGRESSION = 'fm_regression' # TODO: implement this method - - -# Define dict with machine learning wrapper methods and brief description -ML_METHODS = { - RF_BINARY: 'Random Forest Binary Classifier', - LSVC_BINARY: 'Linear SVC Binary Classifier', - FM_BINARY: 'Factorization Machine Binary Classifier', - - RF_MULTILABEL: 'Random Forest Multi-label Classifier', - LR_MULTILABEL: 'Logistic Regression Multi-label Classifier', - - RF_REGRESSION: 'Random Forest Regression', - FM_REGRESSION: 'Factorization Machine Regression' -} diff --git a/fsspark/fs/core.py b/fsspark/fs/core.py deleted file mode 100644 index 8c59fce..0000000 --- a/fsspark/fs/core.py +++ /dev/null @@ -1,566 +0,0 @@ -import logging -import numpy as np -from typing import (Union, - Optional, - List, - Set, - Tuple) - -import pyspark.pandas -import pyspark.sql -from pyspark.ml.feature import (VectorAssembler, - StringIndexer, - Binarizer, - MinMaxScaler, - MaxAbsScaler, - StandardScaler, - RobustScaler) -from pyspark.ml.functions import vector_to_array -from pyspark.pandas.series import Series -from pyspark.sql.functions import (monotonically_increasing_id, - col, - rand) - -logging.basicConfig(format="%(levelname)s (%(name)s %(lineno)s): %(message)s") -logger = logging.getLogger("FSSPARK") -logger.setLevel(logging.INFO) - - -class FSDataFrame: - """ - FSDataFrame is a representation of a Spark DataFrame with some functionalities to perform feature selection. - An object from FSDataFrame is basically represented by a Spark DataFrame with samples - as rows and features as columns, with extra distributed indexed pandas series for - features names and samples labels. - - An object of FSDataFrame offers an interface to a Spark DataFrame, a Pandas on Spark DataFrame - (e.g. suitable for visualization) or a Spark DataFrame with features as a Dense column vector (e.g. suitable for - applying most algorithms from Spark MLib API). - - It can also be split in training and testing dataset and filtered by removing selected features (by name or index). - - [...] - - """ - - def __init__( - self, - df: Union[pyspark.sql.DataFrame, pyspark.pandas.DataFrame], - sample_col: str = None, - label_col: str = None, - row_index_col: Optional[str] = '_row_index', - parse_col_names: bool = False, - parse_features: bool = False, - ): - """ - Create an instance of FSDataFrame. - - Expected an input DataFrame with 2+N columns. - After specifying sample id and sample label columns, the remaining N columns will be considered as features. - - :param df: Spark (or Pandas on Spark) DataFrame - :param sample_col: Sample id column name - :param label_col: Sample label column name - :param row_index_col: Optional. Column name of row indices. - :param parse_col_names: Replace dots (.) in column names with underscores. - :param parse_features: Coerce all features to float. - """ - - self.__df = self._convert_psdf_to_sdf(df) - self.__sample_col = sample_col - self.__label_col = label_col - self.__row_index_name = row_index_col - - # check input dataframe - self._check_df() - - # replace dots in column names, if any. - if parse_col_names: - # TODO: Dots in column names are prone to errors, since dots are used to access attributes from DataFrame. - # Should we make this replacement optional? Or print out a warning? - self.__df = self.__df.toDF(*(c.replace('.', '_') for c in self.__df.columns)) - - # If the specified row index column name does not exist, add row index to the dataframe - if self.__row_index_name not in self.__df.columns: - self.__df = self._add_row_index(index_name=self.__row_index_name) - - if parse_features: - # coerce all features to float - non_features_cols = [self.__sample_col, self.__label_col, self.__row_index_name] - feature_cols = [c for c in self.__df.columns if c not in non_features_cols] - self.__df = self.__df.withColumns({c: self.__df[c].cast('float') for c in feature_cols}) - - self.__indexed_features = self._set_indexed_cols() - self.__indexed_instances = self._set_indexed_rows() - - def _check_df(self): - """ - Check if input DataFrame meet the minimal requirements to feed an FS pipeline. - - :return: None - """ - col_names = self.__df.columns - if self.__sample_col not in col_names: - raise DataFormatError(f"Column sample name {self.__sample_col} not found...") - elif self.__label_col not in col_names: - raise DataFormatError(f"Column label name {self.__label_col} not found...") - elif not isinstance(self.__row_index_name, str): - raise DataFormatError("Row index column name must be a valid string...") - else: - pass - - @staticmethod - def _convert_psdf_to_sdf(df: Union[pyspark.pandas.DataFrame, pyspark.sql.DataFrame]) -> pyspark.sql.DataFrame: - """ - Convert Pandas on Spark DataFrame (psdf) to Spark DataFrame (sdf). - - :param df: Spark (or Pandas on Spark) DataFrame - :return: Spark DataFrame - """ - return df.to_spark(index_col=None) if isinstance(df, pyspark.pandas.DataFrame) else df - - def _set_indexed_cols(self) -> pyspark.pandas.series.Series: - """ - Create a distributed indexed Series representing features. - - :return: Pandas on Spark (PoS) Series - """ - # TODO: Check for equivalent to pandas distributed Series in Spark. - non_features_cols = [self.__sample_col, self.__label_col, self.__row_index_name] - features = [f for f in self.__df.columns if f not in non_features_cols] - return Series(features) - - def _set_indexed_rows(self) -> pyspark.pandas.series.Series: - """ - Create a distributed indexed Series representing samples labels. - It will use existing row indices, if any. - - :return: Pandas on Spark (PoS) Series - """ - # TODO: Check for equivalent to pandas distributed Series in Spark. - label = self.__df.select(self.__label_col).collect() - row_index = self.__df.select(self.__row_index_name).collect() - return Series(label, index=row_index) - - def get_features_indexed(self) -> pyspark.pandas.series.Series: - """ - Return features names with indices as a Series. - :return: Indexed Series. - """ - return self.__indexed_features - - def get_sample_label_indexed(self) -> pyspark.pandas.series.Series: - """ - Return sample labels with indices as a Series. - :return: Indexed Series. - """ - return self.__indexed_instances - - def get_features_names(self) -> list: - """ - Get features names from DataFrame. - :return: List of features names - """ - return self.__indexed_features.tolist() - - def get_features_by_index(self, indices: Union[List[int], Set[int]]) -> List[str]: - """ - Get features names by specified index from DataFrame. - - :param: indices: List of feature indexes - :return: List of features names - """ - return self.__indexed_features.loc[indices].tolist() - - def get_sample_label(self) -> list: - """ - Get samples class (label) from DataFrame. - :return: List of sample class labels - """ - return self.__indexed_instances.tolist() - - def get_sdf_vector(self, output_column_vector: str = 'features') -> pyspark.sql.DataFrame: - """ - Return a Spark dataframe with feature columns assembled into a column vector (a.k.a. Dense Vector column). - This format is required as input for multiple algorithms from MLlib API. - - :param: output_column_vector: Name of the output column vector. - :return: Spark DataFrame - """ - - sdf = self.__df - features_cols = self.get_features_names() - sdf_vector = _assemble_column_vector(sdf, - input_feature_cols=features_cols, - output_column_vector=output_column_vector) - - return sdf_vector - - def get_sdf_and_label(self, - output_column_vector: str = 'features') -> Tuple[pyspark.sql.dataframe.DataFrame, str, str]: - """ - Extracts the Spark DataFrame and label column name from FSDataFrame. - - :param: output_column_vector: Name of the output column vector. - :return: A tuple containing the Spark DataFrame and the label column name. - """ - sdf = self.get_sdf_vector(output_column_vector=output_column_vector) - label_col = self.get_label_col_name() - return sdf, label_col, output_column_vector - - def _collect_features_as_array(self) -> np.array: - """ - Collect features from FSDataFrame as an array. - `Warning`: This method will collect the entire DataFrame into the driver. - Uses this method on small datasets only (e.g., after filtering or splitting the data) - - :return: Numpy array - """ - sdf = self.get_sdf().select(*self.get_features_names()) - a = np.array(sdf.collect()) - return a - - def to_psdf(self) -> pyspark.pandas.DataFrame: - """ - Convert Spark DataFrame to Pandas on Spark DataFrame - :return: Pandas on Spark DataFrame - """ - return self.__df.pandas_api() - - def get_sdf(self) -> pyspark.sql.DataFrame: - """ - Return current Spark DataFrame - :return: Spark DataFrame - """ - return self.__df - - def get_sample_col_name(self) -> str: - """ - Return sample id column name. - - :return: Sample id column name. - """ - return self.__sample_col - - def get_label_col_name(self) -> str: - """ - Return sample label column name. - - :return: Sample label column name. - """ - return self.__label_col - - def get_row_index_name(self) -> str: - """ - Return row (instances) id column name. - - :return: Row id column name. - """ - return self.__row_index_name - - def _add_row_index(self, index_name: str = '_row_index') -> pyspark.sql.DataFrame: - """ - Add row indices to DataFrame. - Unique indices of type integer will be added in non-consecutive increasing order. - - :param: index_name: Name of the row index column. - :return: Spark DataFrame with extra column of row indices. - """ - return self.__df.withColumn(index_name, monotonically_increasing_id()) - - def count_features(self) -> int: - """ - Return the number of features. - - :return: Number of features. - """ - return self.get_features_indexed().size - - def count_instances(self) -> int: - """ - Return the number of samples (instances). - - :return: Number of samples. - """ - return self.get_sample_label_indexed().size - - def filter_features(self, features: List[str], keep: bool = True) -> 'FSDataFrame': - """ - Select or drop specified features from DataFrame. - - :param features: List of features names to drop or select from DataFrame - :param keep: If True (default), keep features. Remove otherwise. - - :return: FSDataFrame - """ - - current_features = self.get_features_names() - if len(set(current_features).intersection(features)) == 0: - logger.warning(f"There is no overlap of specified features with the input data frame.\n" - f"Skipping this filter step...") - return self - - count_a = self.count_features() - sdf = self.get_sdf() - - if keep: - sdf = sdf.select( - self.__sample_col, - self.__label_col, - self.__row_index_name, - *features) - else: - sdf = sdf.drop(*features) - - fsdf_filtered = self.update(sdf, self.__sample_col, self.__label_col, self.__row_index_name) - count_b = fsdf_filtered.count_features() - - logger.info(f"{count_b} features out of {count_a} remain after applying this filter...") - - return fsdf_filtered - - def filter_features_by_index(self, feature_indices: Set[int], keep: bool = True) -> 'FSDataFrame': - """ - Select or drop specified features from DataFrame by its indices. - - :param feature_indices: Set of features indices to drop or select from DataFrame - :param keep: If True (default), keep features. Remove otherwise. - - :return: FSDataFrame - """ - feature_names = self.get_features_by_index(feature_indices) - return self.filter_features(feature_names, keep=keep) - - def get_label_strata(self) -> list: - """ - Get strata from a categorical column in DataFrame. - - :return: List of levels for categorical variable. - """ - levels = self.get_sample_label_indexed().unique().tolist() - number_of_lvs = len(levels) - if number_of_lvs > 20: # TODO: Check if this is a right cutoff. - logger.warning(f"Number of observed levels too high: {number_of_lvs}.\n" - f"Should this variable be considered continuous?") - return levels - - def scale_features(self, scaler_method: str = 'standard', **kwargs) -> 'FSDataFrame': - """ - Scales features in DataFrame - - :param scaler_method: One of: min_max, max_abs, standard or robust. - :return: FSDataFrame with scaled features. - """ - - if scaler_method == 'min_max': - scaler = MinMaxScaler(**kwargs) - elif scaler_method == 'max_abs': - scaler = MaxAbsScaler(**kwargs) - elif scaler_method == 'standard': - scaler = StandardScaler(**kwargs) - elif scaler_method == 'robust': - scaler = RobustScaler(**kwargs) - else: - raise ValueError("`scaler_method` must be one of: min_max, max_abs, standard or robust.") - - features_col_vector = '_features' - scaled_features_vector = '_features_scaled' - - sdf = self.get_sdf_vector(output_column_vector=features_col_vector) - - sdf = (scaler - .setInputCol(features_col_vector) - .setOutputCol(scaled_features_vector) - .fit(sdf) - .transform(sdf) - .drop(features_col_vector) - ) - - sdf = _disassemble_column_vector(sdf, - features_cols=self.get_features_names(), - col_vector_name=scaled_features_vector, - drop_col_vector=True) - - return self.update(sdf, - self.__sample_col, - self.__label_col, - self.__row_index_name) - - def split_df(self, - label_type_cat: bool = True, - split_training_factor: float = 0.7) -> Tuple['FSDataFrame', 'FSDataFrame']: - """ - Split DataFrame into training and test dataset. - It will generate a nearly class-balanced training - and testing set for both categorical and continuous label input. - - :param label_type_cat: If True (the default), the input label colum will be processed as categorical. - Otherwise, it will be considered a continuous variable and binarized. - :param split_training_factor: Proportion of the training set. Usually, a value between 0.6 and 0.8. - - :return: Tuple of FSDataFrames. First element is the training set and second element is the testing set. - """ - - row_index_col = self.get_row_index_name() - label_col = self.get_label_col_name() - sdf = self.__df - - # create a temporal indexed categorical variable for sampling and splitting the data set. - tmp_label_col = '_tmp_label_indexed' - if label_type_cat: - sdf = _string_indexer(sdf=sdf, input_col=label_col, output_col=tmp_label_col) - else: - # If the input label is continuous, create a uniform random distribution [0,1] and binarize this variable. - # It will be used then as categorical for sampling the dataframe. - sdf = sdf.withColumn("_tmp_uniform_rand", rand()) - sdf = (_binarizer(sdf, - input_col="_tmp_uniform_rand", - output_col=tmp_label_col, - threshold=0.5, - drop_input_col=True) - ) - - # Get number of levels for categorical variable. - levels = [lv[tmp_label_col] for lv in sdf.select([tmp_label_col]).distinct().collect()] - - # Sampling DataFrame to extract class-balanced training set. - # This will keep similar proportion by stratum in both training and testing set. - fraction_dict = dict(zip(levels, [split_training_factor] * len(levels))) - training_df = sdf.sampleBy(col=sdf[tmp_label_col], fractions=fraction_dict) - - # Filter out the testing set from the input Dataframe. testing_df = input_sdf[-training_df]. - testing_df = sdf.join(training_df, [row_index_col], "leftanti") - - # Drop tmp cols - training_df = training_df.drop(tmp_label_col) - testing_df = testing_df.drop(tmp_label_col) - - return (self.update(training_df, self.__sample_col, self.__label_col, self.__row_index_name), - self.update(testing_df, self.__sample_col, self.__label_col, self.__row_index_name)) - - @classmethod - def update(cls, - df: pyspark.sql.DataFrame, - sample_col: str, - label_col: str, - row_index_col: str): - """ - Create a new instance of FSDataFrame. - - :param df: Spark DataFrame - :param sample_col: Name of sample id column. - :param label_col: Name of sample label column. - :param row_index_col: Name of row (instances) id column. - - :return: FSDataFrame - """ - return cls(df, sample_col, label_col, row_index_col) - - -def _assemble_column_vector(sdf: pyspark.sql.DataFrame, - input_feature_cols: List[str], - output_column_vector: str = 'features', - drop_input_cols: bool = True) -> pyspark.sql.DataFrame: - """ - Assemble features (columns) from DataFrame into a column of type Dense Vector. - - :param drop_input_cols: - :param sdf: Spark DataFrame - :param input_feature_cols: List of features column names. - :param output_column_vector: Output column of type DenseVector. - - :return: Spark DataFrame - """ - - sdf_vector = (VectorAssembler() - .setInputCols(input_feature_cols) - .setOutputCol(output_column_vector) - .transform(sdf) - ) - - return sdf_vector.drop(*input_feature_cols) if drop_input_cols else sdf_vector - - -def _disassemble_column_vector(sdf: pyspark.sql.DataFrame, - features_cols: List[str], - col_vector_name: str, - drop_col_vector: bool = True) -> pyspark.sql.DataFrame: - """ - Convert a Column Dense Vector in Spark DataFrame to individual columns (a.k.a features). - Basically, revert the operation from `_assemble_column_vector`. - - :param drop_col_vector: - :param sdf: Spark DataFrame - :param features_cols: - :param col_vector_name: - - :return: Spark DataFrame - """ - - sdf = (sdf - .withColumn("_array_col", vector_to_array(sdf[col_vector_name])) - .withColumns({features_cols[i]: col("_array_col")[i] for i in range(len(features_cols))}) - .drop("_array_col") - ) - - return sdf.drop(col_vector_name) if drop_col_vector else sdf - - -def _string_indexer(sdf: pyspark.sql.DataFrame, - input_col: str = None, - output_col: str = "_label_indexed", - drop_input_col: bool = False) -> pyspark.sql.DataFrame: - """ - Wrapper for `pyspark.ml.feature.StringIndexer`. - See https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.StringIndexer.html. - - :param sdf: Spark DataFrame. - :param input_col: Name of the input column to be indexed. - :param output_col: Name of the output column indexed. - :param drop_input_col: Drop input column after indexing. Default: False. - - :return: Spark DataFrame - """ - sdf = (StringIndexer() - .setInputCol(input_col) - .setOutputCol(output_col) - .fit(sdf) - .transform(sdf) - ) - return sdf.drop(input_col) if drop_input_col else sdf - - -def _binarizer(sdf: pyspark.sql.DataFrame, - input_col: str = None, - output_col: str = "_label_binarized", - threshold: float = 0.5, - drop_input_col: bool = False) -> pyspark.sql.DataFrame: - """ - Wrapper for `pyspark.ml.feature.Binarizer`. - See https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.Binarizer.html - - :param sdf: Spark DataFrame. - :param input_col: Name of the numeric input column to be binarized. - :param output_col: Name of the output column binarized. - :param threshold: Threshold used to binarize continuous features. - The features greater than the threshold will be binarized to 1.0. - The features equal to or less than the threshold will be binarized to 0.0 - :param drop_input_col: Drop input column after binarizing. Default: False. - - :return: Spark DataFrame - """ - sdf = (Binarizer() - .setInputCol(input_col) - .setOutputCol(output_col) - .setThreshold(threshold) - .transform(sdf) - ) - - return sdf.drop(input_col) if drop_input_col else sdf - - -class DataFormatError(Exception): - """ - Exception raised for errors in the input/output data format. - """ - pass diff --git a/fsspark/fs/methods.py b/fsspark/fs/methods.py deleted file mode 100644 index d9ce31b..0000000 --- a/fsspark/fs/methods.py +++ /dev/null @@ -1,551 +0,0 @@ -from abc import ABC, abstractmethod -from typing import List, Type, Union, Tuple, Optional, Dict, Any - -from fsspark.fs.constants import (ML_METHODS, UNIVARIATE_METHODS, - MULTIVARIATE_METHODS) -from fsspark.fs.core import FSDataFrame -from fsspark.fs.ml import MLCVModel -from fsspark.fs.multivariate import multivariate_filter -from fsspark.fs.univariate import univariate_filter - - -class FSMethod(ABC): - """ - A general class for feature selection methods. - """ - - valid_methods: Tuple[str] - - def __init__(self, - fs_method, - **kwargs): - """ - Initialize the feature selection method with the specified parameters. - - :param fs_method: The feature selection method to be used. - :param kwargs: Additional keyword arguments for the feature selection method. - """ - self.fs_method = fs_method - self.kwargs = kwargs - self.validate_method(fs_method) - - @property - def valid_methods(self): - """ - Get the valid methods for feature selection. - - :return: A tuple of valid methods. - """ - return tuple(self.valid_methods) - - @abstractmethod - def validate_method(self, fs_method: str): - """ - Abstract method to validate the feature selection method. - - :param fs_method: The feature selection method to be validated. - """ - pass - - @abstractmethod - def select_features(self, fsdf: FSDataFrame): - """ - Abstract method to select features using the feature selection method. - - :param fsdf: The data frame on which feature selection is to be performed. - """ - pass - - @abstractmethod - def validate_params(self, **kwargs): - """ - Abstract method to validate the parameters for the feature selection method. - - :param kwargs: The parameters to be validated. - """ - pass - - def get_params(self): - """ - Get the parameters for the feature selection method. - - :return: The parameters as a copy of the kwargs dictionary. - """ - return self.kwargs.copy() - - def set_params(self, **kwargs): - """ - Set the parameters for the feature selection method. - - :param kwargs: The new parameters to be set. - """ - self.kwargs.update(kwargs) - - -class FSUnivariate(FSMethod): - """ - A class for univariate feature selection methods. - - Attributes: - fs_method (str): The univariate method to be used for feature selection. - kwargs (dict): Additional keyword arguments for the feature selection method. - """ - - valid_methods = list(UNIVARIATE_METHODS.keys()) - - def __init__(self, fs_method: str, **kwargs): - """ - Initialize the univariate feature selection method with the specified parameters. - - Parameters: - fs_method: The univariate method to be used for feature selection. - kwargs: Additional keyword arguments for the feature selection method. - """ - - super().__init__(fs_method, **kwargs) - self.validate_method(fs_method) - - def validate_method(self, fs_method: str): - """ - Validate the univariate method. - - Parameters: - fs_method: The univariate method to be validated. - """ - - if fs_method not in self.valid_methods: - raise InvalidMethodError( - f"Invalid univariate method: {fs_method}. " - f"Accepted methods are {', '.join(self.valid_methods)}" - ) - - def validate_params(self, **kwargs): - """ - Validate the parameters for the univariate method. - - Parameters: - kwargs: The parameters to be validated. - """ - # Additional validation is done directly in the underlying feature selection method - pass - - def select_features(self, fsdf) -> FSDataFrame: - """ - Select features using the specified univariate method. - - Parameters: - fsdf: The data frame on which feature selection is to be performed. - - Returns: - The selected features. - """ - - return univariate_filter( - fsdf, univariate_method=self.fs_method, **self.kwargs - ) - - def __str__(self): - return f"FSUnivariate(method={self.fs_method}, kwargs={self.kwargs})" - - def __repr__(self): - return self.__str__() - - -class FSMultivariate(FSMethod): - """ - The FSMultivariate class is a subclass of the FSMethod class and is used for multivariate - feature selection methods. It provides a way to select features using different multivariate methods such as - multivariate correlation and variance. - - Example Usage - ------------- - # Create an instance of FSMultivariate with multivariate_method='m_corr' - fs_multivariate = FSMultivariate(multivariate_method='m_corr') - - # Select features using the multivariate method - selected_features = fs_multivariate.select_features(fsdf) - """ - - valid_methods = list(MULTIVARIATE_METHODS.keys()) - - def __init__(self, fs_method: str, **kwargs): - """ - Initialize the multivariate feature selection method with the specified parameters. - - Parameters: - fsdf: The data frame on which feature selection is to be performed. - fs_method: The multivariate method to be used for feature selection. - kwargs: Additional keyword arguments for the feature selection method. - """ - - super().__init__(fs_method, **kwargs) - self.validate_method(fs_method) - - def validate_method(self, multivariate_method: str): - """ - Validate the multivariate method. - - Parameters: - multivariate_method: The multivariate method to be validated. - """ - - if multivariate_method not in self.valid_methods: - raise InvalidMethodError( - f"Invalid multivariate method: " - f"{multivariate_method}. Accepted methods are {', '.join(self.valid_methods)}" - ) - - def validate_params(self, **kwargs): - """ - Validate the parameters for the multivariate method. - - Parameters: - kwargs: The parameters to be validated. - """ - # Additional validation is done directly in the underlying feature selection method - pass - - def select_features(self, fsdf: FSDataFrame): - """ - Select features using the specified multivariate method. - """ - - return multivariate_filter( - fsdf, multivariate_method=self.fs_method, **self.kwargs - ) - - def __str__(self): - return f"FSMultivariate(multivariate_method={self.fs_method}, kwargs={self.kwargs})" - - def __repr__(self): - return self.__str__() - - -class FSMLMethod(FSMethod): - """ - A class for machine learning feature selection methods. - - Attributes: - fs_method (str): The machine learning method to be used for feature selection. - kwargs (dict): Additional keyword arguments for the feature selection method. - """ - - valid_methods = list(ML_METHODS.keys()) - _ml_model: MLCVModel = None - - def __init__(self, - fs_method: str, - rfe: bool = False, - rfe_iterations: int = 3, - percent_to_keep: float = 0.90, - **kwargs): - """ - Initialize the machine learning feature selection method with the specified parameters. - - Parameters: - fs_method: The machine learning method to be used for feature selection. - kwargs: Additional keyword arguments for the feature selection method. - """ - - super().__init__(fs_method, **kwargs) - self.validate_method(fs_method) - - # set the estimator, grid and cv parameters (or none if not provided) - self.estimator_params = kwargs.get('estimator_params', None) # estimator parameters - self.evaluator_params = kwargs.get('evaluator_params', None) # evaluator parameters - self.grid_params = kwargs.get('grid_params', None) # grid parameters - self.cv_params = kwargs.get('cv_params', None) # cross-validation parameters - - # set the machine learning model - self._ml_model = self._set_ml_model() - - # parameters to control the recursive feature elimination process (rfe) - self.rfe = rfe - self.percent_to_keep = percent_to_keep - self.rfe_iterations = rfe_iterations - - # performance metrics - self.rfe_training_metric: list = [] # performance metrics on training for each rfe iteration - self.training_metric = None # performance metrics on training (final model) - self.testing_metric = None # performance metrics on testing (final model) - - # feature importance - self.feature_scores = None - - def validate_method(self, fs_method: str): - """ - Validate the machine learning method. - - Parameters: - fs_method: The machine learning method to be validated. - """ - - if fs_method not in self.valid_methods: - raise InvalidMethodError( - f"Invalid machine learning method: {fs_method}. Accepted methods are {', '.join(self.valid_methods)}" - ) - - def validate_params(self, **kwargs): - """ - Validate the parameters for the machine learning method. - - Parameters: - kwargs: The parameters to be validated. - """ - # Additional validation is done directly in the underlying feature selection method - pass - - def _set_ml_model(self): - """ - Select the machine learning model to be used for feature selection. - - Returns: - The machine learning model. - """ - - model_type = self.fs_method - - self._ml_model = MLCVModel.create_model( - model_type=model_type, - estimator_params=self.estimator_params, - evaluator_params=self.evaluator_params, - grid_params=self.grid_params, - cv_params=self.cv_params - ) - - return self._ml_model - - def _fit_and_filter(self, df: FSDataFrame) -> FSDataFrame: - - # fit the current machine learning model - self._ml_model.fit(df) - - # get feature scores - feature_scores = self._ml_model.get_feature_scores() - - # get feature based on the (percentile) threshold provided - # expected a dataframe sorted by scores in descending order - selected_features = feature_scores.iloc[ - :int(self.percent_to_keep * len(feature_scores)) - ]['feature_index'] - - return df.filter_features_by_index(selected_features, keep=True) - - def select_features(self, fsdf: FSDataFrame) -> FSDataFrame: - """ - Select features using the specified machine learning method. - - Parameters: - fsdf: The data frame on which feature selection is to be performed. - - Returns: - FSDataFrame: The data frame with selected features. - """ - - if fsdf is None or fsdf.count_features() == 0 or fsdf.count_instances() == 0: - raise ValueError("The data frame is empty or does not contain any features.") - - fsdf = self._fit_and_filter(fsdf) - - # Recursive feature elimination - if self.rfe: - for iteration in range(self.rfe_iterations): - print(f"RFE: running {iteration + 1} of {self.rfe_iterations} iterations...") - fsdf = self._fit_and_filter(fsdf) - # collect the performance metrics on training for every rfe iteration - self.rfe_training_metric.append(self._ml_model.get_eval_metric_on_training()) - - # get the final performance metric on training - self.training_metric = self._ml_model.get_eval_metric_on_training() - - # get the feature scores after feature selection - self.feature_scores = self._ml_model.get_feature_scores() - - return fsdf - - def get_eval_metric_name(self): - """ - Get the evaluation metric name. - - Returns: - The evaluation metric name. - """ - - if self._ml_model is None: - raise ValueError("No machine learning model is available.") - - return self._ml_model.get_eval_metric_name() - - def get_eval_metric_on_training_rfe(self): - """ - Get the evaluation metric on the training data for each RFE iteration. - - Returns: - The evaluation metric on the training data for each RFE iteration. - """ - if self.rfe_training_metric is None: - raise ValueError("No training metric is available. Run the select_features method first.") - return self.rfe_training_metric - - def get_eval_metric_on_training(self): - """ - Get the evaluation metric on the training data. - - Returns: - The evaluation metric on the training data. - """ - if self.training_metric is None: - raise ValueError("No training metric is available. Run the select_features method first.") - return self.training_metric - - def get_eval_metric_on_testing(self, fsdf: FSDataFrame): - """ - Evaluate the machine learning method on the testing data. - - Parameters: - fsdf: The testing data frame on which the machine learning method is to be evaluated. - - Returns: - The evaluation metric on the testing data. - """ - - if fsdf is None or fsdf.count_features() == 0 or fsdf.count_instances() == 0: - raise ValueError("The testing data frame is empty or does not contain any features.") - - # evaluate the model on the testing data - eval_metric = self._ml_model.get_eval_metric_on_testing(fsdf) - self.testing_metric = eval_metric - - return eval_metric - - def get_feature_scores(self): - """ - Get the feature scores after feature selection. - - Returns: - The feature scores as a pandas DataFrame. - """ - - if self.feature_scores is None: - raise ValueError("Feature scores are not available. Run the feature selection method first.") - - return self.feature_scores - - def __str__(self): - return f"FSMLMethod(method={self.fs_method}, kwargs={self.kwargs})" - - def __repr__(self): - return self.__str__() - - -class FSPipeline: - """ - The FSPipeline class creates a pipeline of feature selection methods. It provides a way to - chain multiple feature selection methods together to create a pipeline of feature selection methods. - - Example Usage - ------------- - # Create an instance of FSPipeline with the specified feature selection methods - fs_pipeline = FSPipeline(fs_methods=[FSUnivariate('anova'), FSMultivariate('m_corr')]) - - # Select features using the pipeline - selected_features = fs_pipeline.select_features(fsdf) - """ - - _valid_methods: List[Type[Union[FSUnivariate, FSMultivariate, FSMLMethod]]] = [FSUnivariate, - FSMultivariate, - FSMLMethod] - - def __init__(self, - df_training: FSDataFrame, - df_testing: Optional[FSDataFrame], - fs_stages: List[Union[FSUnivariate, FSMultivariate, FSMLMethod]]): - """ - Initialize the feature selection pipeline with the specified feature selection methods. - - Parameters: - df_training: The training data frame on which the feature selection pipeline is to be run. - df_testing: The testing data frame on which the ML wrapper method (if any) is to be evaluated. - fs_stages: A list of feature selection methods to be used in the pipeline. - """ - - self.df_training = df_training - self.df_testing = df_testing - self.fs_stages = fs_stages - self.validate_methods() - - self.pipeline_results = {} - - def validate_methods(self): - """ - Validate the feature selection methods in the pipeline. - """ - # check if the pipeline contains at least one feature selection method - if len(self.fs_stages) == 0: - raise ValueError("The pipeline must contain at least one feature selection method.") - - # check if the feature selection methods are valid - if not all(isinstance(method, tuple(self._valid_methods)) for method in self.fs_stages): - raise InvalidMethodError(f"Invalid feature selection method. " - f"Accepted methods are {', '.join([str(m) for m in self._valid_methods])}") - - # check if only one ML method is used in the pipeline - ml_methods = [method for method in self.fs_stages if isinstance(method, FSMLMethod)] - if len(ml_methods) > 1: - raise ValueError("Only one ML method is allowed in the pipeline.") - - def run(self) -> Dict[str, Any]: - """ - Run the feature selection pipeline. - - Returns: - A dictionary with the results of the feature selection pipeline. - """ - - # apply each feature selection method in the pipeline sequentially - n_stages = len(self.fs_stages) - fsdf_tmp = self.df_training - - self.pipeline_results.update(n_stages=n_stages) - - for i, method in enumerate(self.fs_stages): - print(f"Running stage {i + 1} of {n_stages} of the feature selection pipeline: {method}") - if isinstance(method, FSMLMethod): - - fsdf_tmp = method.select_features(fsdf_tmp) - - # collect the results during the feature selection process (rfe iterations, feature scores, etc.) - self.pipeline_results.update(rfe_iterations=method.rfe_iterations) - self.pipeline_results.update(feature_scores=method.get_feature_scores()) - self.pipeline_results.update(eval_metric=method.get_eval_metric_name()) - self.pipeline_results.update(rfe_training_metric=method.get_eval_metric_on_training_rfe()) - self.pipeline_results.update(training_metric=method.get_eval_metric_on_training()) - - if self.df_testing is not None: - - # evaluate the final model on the testing data (if available) - testing_metric = method.get_eval_metric_on_testing(self.df_testing) - self.pipeline_results.update(testing_metric=testing_metric) - - else: - fsdf_tmp = method.select_features(fsdf_tmp) - - self.pipeline_results.update(n_initial_features=self.df_training.count_features()) - self.pipeline_results.update(n_selected_features=fsdf_tmp.count_features()) - - return self.pipeline_results - - def __str__(self): - return f"FSPipeline(fs_methods={self.fs_stages})" - - def __repr__(self): - return self.__str__() - - -class InvalidMethodError(Exception): - """ - Error raised when an invalid feature selection method is used. - """ - - def __init__(self, message): - super().__init__(message) diff --git a/fsspark/fs/ml.py b/fsspark/fs/ml.py deleted file mode 100644 index 42b51cc..0000000 --- a/fsspark/fs/ml.py +++ /dev/null @@ -1,388 +0,0 @@ -""" - -A set of pre-defined ML algorithms wrapped with cross-validation approach -for feature selection (e.g., rank by feature importance) and prediction. - -""" -import warnings -from typing import List, Any, Dict, Optional, Union - -import pandas as pd -from pyspark.ml import Estimator, Model -from pyspark.ml.classification import (RandomForestClassificationModel, - LinearSVCModel, - RandomForestClassifier, - LinearSVC, LogisticRegression, LogisticRegressionModel) -from pyspark.ml.evaluation import (Evaluator, - BinaryClassificationEvaluator, - MulticlassClassificationEvaluator, - RegressionEvaluator) -from pyspark.ml.regression import RandomForestRegressionModel, RandomForestRegressor -from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, CrossValidatorModel, Param - -from fsspark.fs.constants import (RF_BINARY, - LSVC_BINARY, - FM_BINARY, - RF_MULTILABEL, - LR_MULTILABEL, - RF_REGRESSION, - FM_REGRESSION, - ML_METHODS) -from fsspark.fs.core import FSDataFrame - -ESTIMATORS_CLASSES = [RandomForestClassifier, RandomForestRegressionModel, LinearSVC, LogisticRegression] -EVALUATORS_CLASSES = [BinaryClassificationEvaluator, MulticlassClassificationEvaluator, RegressionEvaluator] - - -# Define an abstract class that allow to create a factory of models -# with the same interface -# This class allows to perform the following operations: -# - Define an Estimator -# - Define an Evaluator -# - Define a grid of parameters (model tuning) -# - Define a cross-validator (model fitting) -class MLCVModel: - """ - A factory class for creating various machine learning models with Spark MLlib. - ML model are created using a cross-validator approach for hyperparameter tuning. - """ - - _cross_validator: CrossValidator = None - _fitted_cv_model: CrossValidatorModel = None - _best_model: Model = None - _fsdf: FSDataFrame = None - - def __init__(self, - estimator: Union[RandomForestClassifier | - RandomForestRegressionModel | - LinearSVC | - LogisticRegression], - evaluator: Union[BinaryClassificationEvaluator | - MulticlassClassificationEvaluator | - RegressionEvaluator], - estimator_params: Optional[Dict[str, Any]] = None, - evaluator_params: Optional[Dict[str, Any]] = None, - grid_params: Optional[Dict[str, List[Any]]] = None, - cv_params: Optional[Dict[str, Any]] = None): - """ - Initializes the MLModel with optional estimator, evaluator, and parameter specifications. - """ - self.estimator = estimator - self.evaluator = evaluator - self.estimator_params = estimator_params - self.evaluator_params = evaluator_params - self.grid_params = grid_params - self.cv_params = cv_params - - self._initialize_model() - - def _initialize_model(self): - # Validate and set estimator parameters - if self.estimator: - self._validate_estimator(self.estimator) - self._validate_estimator_params(self.estimator_params) - self._set_estimator_params() - - # Validate and evaluator - if self.evaluator: - self._validate_evaluator(self.evaluator) - self._validate_evaluator_params(self.evaluator_params) - self._set_evaluator_params() - - # Parse and set grid parameters - if self.grid_params: - self.grid_params = self._parse_grid_params(self.grid_params) - - # Initialize and set cross-validator parameters - self._set_cross_validator() - - def _parse_grid_params(self, grid_params: Dict[str, List[Any]]) -> List[Dict[Param, Any]]: - """ - Parse the grid parameters to create a list of dictionaries. - - :param grid_params: A dictionary containing the parameter names as keys and a list of values as values. - :return: A list of dictionaries, where each dictionary represents a set of parameter values. - """ - grid = ParamGridBuilder() - for param, values in grid_params.items(): - if hasattr(self.estimator, param): - grid = grid.addGrid(getattr(self.estimator, param), values) - else: - raise AttributeError(f"{self.estimator.__class__.__name__} does not have attribute {param}") - return grid.build() - - def _validate_estimator(self, estimator: Estimator) -> 'MLCVModel': - """ - Validate the estimator. - - :param estimator: The estimator to validate. - :return: The validated estimator. - """ - # check estimator is an instance of ESTIMATORS_CLASSES - if not isinstance(estimator, tuple(ESTIMATORS_CLASSES)): - raise ValueError(f"Estimator must be an instance of {ESTIMATORS_CLASSES}") - return self - - def _validate_evaluator(self, evaluator: Evaluator) -> 'MLCVModel': - """ - Validate the evaluator. - - :param evaluator: The evaluator to validate. - :return: The validated evaluator. - """ - # check evaluator is an instance of EVALUATORS_CLASSES - if not isinstance(evaluator, tuple(EVALUATORS_CLASSES)): - raise ValueError(f"Evaluator must be an instance of {EVALUATORS_CLASSES}") - return self - - def _validate_estimator_params(self, estimator_params: Dict[str, Any]) -> None: - """ - Validate the estimator parameters. - - :param estimator_params: A dictionary containing the parameter names as keys and values as values. - """ - if estimator_params is None: - return - for param, _ in estimator_params.items(): - if not self.estimator.hasParam(param): - raise AttributeError(f"{self.estimator.__class__.__name__} does not have attribute {param}") - - def _validate_evaluator_params(self, evaluator_params: Dict[str, Any]) -> None: - """ - Validate the evaluator parameters. - - :param evaluator_params: A dictionary containing the parameter names as keys and values as values. - """ - if evaluator_params is None: - return - for param, _ in evaluator_params.items(): - if not self.evaluator.hasParam(param): - raise AttributeError(f"{self.evaluator.__class__.__name__} does not have attribute {param}") - - def _set_evaluator_params(self) -> 'MLCVModel': - """ - Set evaluator parameters. - """ - if self.evaluator_params is not None: - self.evaluator = self.evaluator.setParams(**self.evaluator_params) - return self - - def _set_estimator_params(self) -> 'MLCVModel': - """ - Set estimator parameters. - """ - if self.estimator_params is not None: - self.estimator = self.estimator.setParams(**self.estimator_params) - return self - - def _set_cv_params(self, cv_params: Dict[str, Any]) -> 'MLCVModel': - """ - Parse the cross-validator parameters to create an instance of CrossValidator. - - :param cv_params: A dictionary containing the parameter names as keys and values as values. - :return: An instance of CrossValidator. - """ - - for param, value in cv_params.items(): - if hasattr(self._cross_validator, param): - setattr(self._cross_validator, param, value) - else: - raise AttributeError(f"{self._cross_validator.__class__.__name__} does not have attribute {param}") - return self - - def _set_cross_validator(self) -> 'MLCVModel': - """ - Build the model using the cross-validator. - - :return: The CrossValidator model. - """ - try: - self._cross_validator = CrossValidator( - estimator=self.estimator, - estimatorParamMaps=self.grid_params, - evaluator=self.evaluator, - ) - if self.cv_params is not None: - self._cross_validator = self._cross_validator.setParams(**self.cv_params) - return self - except Exception as e: - print(f"An error occurred while creating the CrossValidator: {str(e)}") - # Handle the exception or raise it to be handled by the caller - raise - - def fit(self, fsdf: FSDataFrame) -> 'MLCVModel': - """ - Fit the model using the cross-validator. - - :return: The CrossValidatorModel after fitting. - """ - # Extract the Spark DataFrame and label column name from FSDataFrame - self._fsdf = fsdf - - if self._cross_validator is None or self.estimator is None or self.evaluator is None: - raise ValueError("Cross-validator, estimator, or evaluator not set properly.") - - self._fitted_cv_model = self._cross_validator.fit(self._fsdf.get_sdf_vector()) - return self - - def _get_best_model(self) -> Model: - """ - Get the best model from the fitted CrossValidatorModel. - - :return: The best model. - """ - if self._fitted_cv_model is None: - raise ValueError("CrossValidatorModel not fitted. Use fit() to fit the model.") - self._best_model = self._fitted_cv_model.bestModel - return self._best_model - - # define a static method that allows to set a ml model based on the model type - @staticmethod - def create_model(model_type: str, - estimator_params: Dict[str, Any] = None, - evaluator_params: Dict[str, Any] = None, - grid_params: Dict[str, List[Any]] = None, - cv_params: Dict[str, Any] = None) -> 'MLCVModel': - """ - Set a machine learning model based on the model type. - - :param model_type: The type of model to set. - :param estimator_params: Parameters for the estimator. - :param evaluator_params: Parameters for the evaluator. - :param grid_params: A dictionary containing the parameter names as keys and a list of values as values. - :param cv_params: Parameters for the cross-validator. - - :return: An instance of MLModel. - """ - if model_type == RF_BINARY: - estimator = RandomForestClassifier() - evaluator = BinaryClassificationEvaluator() - elif model_type == LSVC_BINARY: - estimator = LinearSVC() - evaluator = BinaryClassificationEvaluator() - elif model_type == RF_MULTILABEL: - estimator = RandomForestClassifier() - evaluator = MulticlassClassificationEvaluator() - elif model_type == LR_MULTILABEL: - estimator = LogisticRegression() - evaluator = MulticlassClassificationEvaluator() - elif model_type == RF_REGRESSION: - estimator = RandomForestRegressor() - evaluator = RegressionEvaluator() - else: - raise ValueError(f"Unsupported model type: {model_type}." - f"Supported model types are: {list(ML_METHODS.keys())}") - - ml_method = MLCVModel( - estimator=estimator, - evaluator=evaluator, - estimator_params=estimator_params, - evaluator_params=evaluator_params, - grid_params=grid_params, - cv_params=cv_params - ) - - return ml_method - - def get_eval_metric_name(self) -> str: - """ - Get the evaluation metric name. - - :return: The evaluation metric name. - """ - return self.evaluator.getMetricName() - - def get_feature_scores(self) -> pd.DataFrame: - - # TODO: This function should be able to parse all available models. - - indexed_features = self._fsdf.get_features_indexed() - best_model = self._get_best_model() - - # raise exception if the model is not none - if best_model is None: - raise ValueError("No ML model have been fitted. Use fit() to fit the model.") - - df_features = pd.DataFrame(indexed_features.to_numpy(), - columns=["features"]) - - if isinstance(best_model, (RandomForestClassificationModel, RandomForestRegressionModel)): - df_scores = pd.DataFrame( - data=best_model.featureImportances.toArray(), - columns=["scores"] - ) - - df_scores = df_scores.reset_index(level=0).rename(columns={"index": "feature_index"}) - - # merge the feature scores with the feature names - df = df_features.merge( - df_scores, how="right", left_index=True, right_index=True - ) # index-to-index merging - - # sort the dataframe by scores in descending order - df = df.sort_values(by="scores", ascending=False) - - # add feature percentile rank to the features_scores dataframe - df['percentile_rank'] = df['scores'].rank(pct=True) - - return df - - else: - raise ValueError("Unsupported model type. " - "Only RandomForestClassificationModel, " - "RandomForestRegressionModel, and LinearSVCModel are supported.") - - def get_eval_metric_on_training(self) -> float: - """ - Get the evaluation metric on training data from a trained CrossValidatorModel (best model). - - :return: A dictionary containing the evaluation metric name and value. - """ - - # TODO: This function should be able to parse all available models. - - # get the best model from the fitted cross-validator model - best_model = self._get_best_model() - - # get the eval metric name from the evaluator - eval_metric_name = self.get_eval_metric_name() - - if isinstance(best_model, (RandomForestClassificationModel, LogisticRegressionModel)): - metric_value = getattr(best_model.summary, eval_metric_name) - - elif isinstance(best_model, LinearSVCModel): - metric_value = getattr(best_model.summary(), eval_metric_name) - - else: - warnings.warn("Unsupported model type. Unable to get evaluation metric.") - metric_value = None - - return metric_value - - def get_eval_metric_on_testing(self, test_data: FSDataFrame) -> float: - """ - Get accuracy on test data from a trained CrossValidatorModel (best model). - - :param test_data: The test data as a FSDataFrame object. - :return: accuracy - """ - - # TODO: This function should be able to parse all available models. - - # get the best model from the fitted cross-validator model - best_model = self._get_best_model() - - # get test data features harmonized with training features - training_features = self._fsdf.get_features_names() - test_data = test_data.filter_features(training_features, keep=True) - - # predict the test data - predictions = None - if isinstance(best_model, (RandomForestClassificationModel, LinearSVCModel, LogisticRegressionModel)): - predictions = best_model.transform(test_data.get_sdf_vector()) - - metric_value = None - if predictions is not None: - metric_value = self.evaluator.evaluate(predictions) - - return metric_value diff --git a/fsspark/fs/multivariate.py b/fsspark/fs/multivariate.py deleted file mode 100644 index f4af43e..0000000 --- a/fsspark/fs/multivariate.py +++ /dev/null @@ -1,147 +0,0 @@ -import logging -from typing import List - -import numpy as np -import pyspark -from pyspark.ml.feature import (VarianceThresholdSelector) -from pyspark.ml.stat import Correlation - -from fsspark.fs.constants import MULTIVARIATE_METHODS, MULTIVARIATE_CORRELATION, MULTIVARIATE_VARIANCE - -from fsspark.fs.core import FSDataFrame -from fsspark.fs.utils import find_maximal_independent_set -from fsspark.utils.generic import tag - -logging.basicConfig(format="%(levelname)s (%(name)s %(lineno)s): %(message)s") -logger = logging.getLogger("FSSPARK:MULTIVARIATE") -logger.setLevel(logging.INFO) - - -@tag("experimental") -def _compute_correlation_matrix(sdf: pyspark.sql.DataFrame, - features_col: str = 'features', - corr_method: str = "pearson") -> np.ndarray: - """ - Compute features Matrix Correlation. - - :param sdf: Spark DataFrame - :param features_col: Name of the feature column vector name. - :param corr_method: One of `pearson` (default) or `spearman`. - - :return: Numpy array. - """ - - logger.warning("Warning: Computed matrix correlation will be collected into the drive with this implementation.\n" - "This may cause memory issues. Use it preferably with small datasets.") - logger.info(f"Computing correlation matrix using {corr_method} method.") - - mcorr = (Correlation - .corr(sdf, features_col, corr_method) - .collect()[0][0] - .toArray() - ) - return mcorr - - -@tag("experimental") -def multivariate_correlation_selector(fsdf: FSDataFrame, - strict: bool = True, - corr_threshold: float = 0.75, - corr_method: str = "pearson") -> List[str]: - """ - Compute the correlation matrix (Pearson) among input features and select those below a specified threshold. - - :param fsdf: Input FSDataFrame - :param strict: If True (default), apply hard filtering (strict) to remove highly correlated features. - Otherwise, find the maximal independent set of highly correlated features (approximate method). - `Warning`: The approximate method is experimental. - :param corr_threshold: Minimal correlation threshold to consider two features correlated. - :param corr_method: One of `pearson` (default) or `spearman`. - - :return: List of selected features names - """ - - colum_vector_features = 'features' - sdf = fsdf.get_sdf_vector(output_column_vector=colum_vector_features) - - # compute correlation matrix - mcorr = _compute_correlation_matrix(sdf, - features_col=colum_vector_features, - corr_method=corr_method) - - mcorr = np.abs(mcorr) # get absolute correlation value - combs_above_cutoff = np.triu(mcorr, k=1) > corr_threshold # create bool matrix that meet criteria - correlated_col_index = tuple(np.column_stack(np.where(combs_above_cutoff))) # get correlated pairs cols index - - index_to_remove = set() - if strict: - # hard filtering method - # Original implementation: https://www.rdocumentation.org/packages/caret/versions/6.0-93/topics/findCorrelation - cols_mean = np.mean(mcorr, axis=1) # get cols index mean - for pairs in correlated_col_index: - i = pairs[0] - j = pairs[1] - index_to_remove.add(i if cols_mean[i] > cols_mean[j] else j) - else: - # approximate method - index_to_remove = find_maximal_independent_set(correlated_col_index, keep=False) - - features = fsdf.get_features_names() # get all current features - features_to_remove = fsdf.get_features_by_index(index_to_remove) - selected_features = [sf for sf in features if sf not in features_to_remove] - - return selected_features - - -@tag("spark implementation") -def multivariate_variance_selector(fsdf: FSDataFrame, - variance_threshold: float = 0.0) -> List[str]: - """ - Select features after removing low-variance ones (e.g., features with quasi-constant value across samples). - - :param fsdf: Input FSDataFrame - :param variance_threshold: Minimal variance value allowed to select a feature. - - :return: List of selected features names - """ - - colum_vector_features = 'features' - sdf = fsdf.get_sdf_vector(output_column_vector=colum_vector_features) - - selector = VarianceThresholdSelector() - (selector - .setFeaturesCol(colum_vector_features) - .setOutputCol("selectedFeatures") - .setVarianceThreshold(variance_threshold) - ) - - model = selector.fit(sdf) - selected_features_indices = set(model.selectedFeatures) - selected_features = fsdf.get_features_by_index(selected_features_indices) - - return selected_features - - -def multivariate_filter(fsdf: FSDataFrame, - multivariate_method: str = 'm_corr', - **kwargs) -> FSDataFrame: - """ - Filter features after applying a multivariate feature selector method. - - :param fsdf: Input FSDataFrame - :param multivariate_method: Multivariate selector method. - Possible values are 'm_corr' or 'variance'. - - :return: Filtered FSDataFrame - """ - if multivariate_method == MULTIVARIATE_CORRELATION: - selected_features = multivariate_correlation_selector(fsdf, **kwargs) - elif multivariate_method == MULTIVARIATE_VARIANCE: - selected_features = multivariate_variance_selector(fsdf, **kwargs) - else: - raise ValueError(f"Invalid multivariate method: {multivariate_method}. " - f"Choose one of {MULTIVARIATE_METHODS.keys()}.") - - logger.info(f"Applying multivariate filter {multivariate_method}.") - - return fsdf.filter_features(selected_features, keep=True) diff --git a/fsspark/fs/univariate.py b/fsspark/fs/univariate.py deleted file mode 100644 index 1103762..0000000 --- a/fsspark/fs/univariate.py +++ /dev/null @@ -1,133 +0,0 @@ -import logging -from typing import Dict, List - -import pyspark.sql.functions as f -from pyspark.ml.feature import UnivariateFeatureSelector - -from fsspark.fs.constants import ANOVA, UNIVARIATE_CORRELATION, F_REGRESSION, UNIVARIATE_METHODS - -from fsspark.fs.core import FSDataFrame -from fsspark.utils.generic import tag - -logging.basicConfig(format="%(levelname)s (%(name)s %(lineno)s): %(message)s") -logger = logging.getLogger("FSSPARK:UNIVARIATE") -logger.setLevel(logging.INFO) - - -@tag("spark implementation") -def compute_univariate_corr(fsdf: FSDataFrame) -> Dict[str, float]: - """ - Compute the correlation coefficient between every column (features) in the input DataFrame and the label (class). - - :param fsdf: Input FSDataFrame - - :return: Return dict {feature -> corr} - """ - - sdf = fsdf.get_sdf() - features = fsdf.get_features_names() - label = fsdf.get_label_col_name() - - u_corr = sdf.select([f.abs(f.corr(sdf[c], sdf[label])).alias(c) for c in features]) - - return u_corr.first().asDict() - - -def univariate_correlation_selector(fsdf: FSDataFrame, - corr_threshold: float = 0.3) -> List[str]: - """ - Select features based on its correlation with a label (class), if corr value is less than a specified threshold. - Expected both features and label to be of type numeric. - - :param fsdf: FSDataFrame - :param corr_threshold: Maximal correlation threshold allowed between feature and label. - - :return: List of selected features names - """ - d = compute_univariate_corr(fsdf) - selected_features = [k for k in d.keys() if d.get(k) <= corr_threshold] - - return selected_features - - -@tag("spark implementation") -def univariate_selector(fsdf: FSDataFrame, - label_type: str = 'categorical', - selection_mode: str = 'percentile', - selection_threshold: float = 0.8, - **kwargs) -> List[str]: - """ - Wrapper for `UnivariateFeatureSelector`. - See https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.UnivariateFeatureSelector.html - - Only continues features are supported. If label is categorical, ANOVA test is used. If label is of type continues - then an F-regression test is used. - - :param fsdf: Input FSDataFrame - :param label_type: Type of label. Possible values are 'categorical' or 'continuous'. - :param selection_mode: Mode for feature selection. Possible values are 'numTopFeatures' or 'percentile'. - :param selection_threshold: Number of features to select or the percentage of features to select. - - :return: List of selected features names - """ - - vector_col_name = 'features' - sdf = fsdf.get_sdf_vector(output_column_vector=vector_col_name) - label = fsdf.get_label_col_name() - - # set selector - if label_type == 'categorical': - # TODO: print msg to logger with the method being used here... - print("ANOVA (F-classification) univariate feature selection") - elif label_type == 'continuous': - # TODO: print msg to logger with the method being used here... - print("F-value (F-regression) univariate feature selection") - else: - raise ValueError("`label_type` must be one of categorical or continuous") - - selector = UnivariateFeatureSelector(**kwargs) - (selector - .setLabelType(label_type) - .setFeaturesCol(vector_col_name) - .setFeatureType("continuous") - .setOutputCol("selectedFeatures") - .setLabelCol(label) - .setSelectionMode(selection_mode) - .setSelectionThreshold(selection_threshold) - ) - - model = selector.fit(sdf) - selected_features_indices = model.selectedFeatures - selected_features = fsdf.get_features_by_index(selected_features_indices) - - return selected_features - - -@tag("spark implementation") -def univariate_filter(fsdf: FSDataFrame, - univariate_method: str = 'u_corr', - **kwargs) -> FSDataFrame: - """ - Filter features after applying a univariate feature selector method. - - :param fsdf: Input FSDataFrame - :param univariate_method: Univariate selector method. - Possible values are 'u_corr', 'anova' (categorical label) - or 'f_regression' (continuous label). - - :return: Filtered FSDataFrame - """ - - if univariate_method == ANOVA: - selected_features = univariate_selector(fsdf, label_type='categorical', **kwargs) - elif univariate_method == F_REGRESSION: - selected_features = univariate_selector(fsdf, label_type='continuous', **kwargs) - elif univariate_method == UNIVARIATE_CORRELATION: - selected_features = univariate_correlation_selector(fsdf, **kwargs) - else: - raise ValueError(f"Univariate method {univariate_method} not supported. " - f"Expected one of {UNIVARIATE_METHODS.keys()}") - - logger.info(f"Applying univariate filter {univariate_method}.") - - return fsdf.filter_features(selected_features, keep=True) diff --git a/fsspark/pipeline/fs_pipeline_example.py b/fsspark/pipeline/fs_pipeline_example.py deleted file mode 100644 index c7b9f75..0000000 --- a/fsspark/pipeline/fs_pipeline_example.py +++ /dev/null @@ -1,67 +0,0 @@ -""" -Example of a feature selection pipeline implemented in fsspark. - -After data import and pre-processing, the pipeline applies univariate correlation filter, -multivariate correlation filter and Randon Forest classification. - -""" - -from fsspark.config.context import init_spark, stop_spark_session -from fsspark.fs.core import FSDataFrame -from fsspark.fs.methods import FSPipeline, FSUnivariate, FSMultivariate, FSMLMethod -from fsspark.utils.datasets import get_tnbc_data_path -from fsspark.utils.io import import_table_as_psdf - -# Init spark -init_spark(apply_pyarrow_settings=True, - apply_extra_spark_settings=True, - apply_pandas_settings=True) - -# 1. Import data -df = import_table_as_psdf(get_tnbc_data_path(), n_partitions=5) -fsdf = FSDataFrame(df, sample_col='Sample', label_col='label') - -# 2. Split data -training_data, testing_data = fsdf.split_df(split_training_factor=0.6) - -# 3. Set feature selection methods -# create a Univariate object -univariate = FSUnivariate( - fs_method='anova', - selection_mode='percentile', - selection_threshold=0.8) - -# create a Multivariate object -multivariate = FSMultivariate( - fs_method='m_corr', - corr_threshold=0.75, - corr_method="pearson" -) - -# create a MLMethod object -rf_classifier = FSMLMethod( - fs_method='rf_multilabel', - rfe=True, - rfe_iterations=2, - percent_to_keep=0.9, - estimator_params={'labelCol': 'label'}, - evaluator_params={'metricName': 'accuracy'}, - grid_params={'numTrees': [10, 15], 'maxDepth': [5, 10]}, - cv_params={'parallelism': 2, 'numFolds': 5} -) - -# 4. Create a pipeline object -fs_pipeline = FSPipeline(df_training=training_data, - df_testing=testing_data, - fs_stages=[univariate, multivariate, rf_classifier]) - -# 5. Run the pipeline -results = fs_pipeline.run() - -# Print results -print(f"Accuracy on training: {results['training_metric']}") -print(f"Accuracy on testing: {results['testing_metric']}") -print(results.get('feature_scores')) - - -stop_spark_session() diff --git a/fsspark/tests/test_FSDataFrame.py b/fsspark/tests/test_FSDataFrame.py deleted file mode 100644 index 2376b99..0000000 --- a/fsspark/tests/test_FSDataFrame.py +++ /dev/null @@ -1,79 +0,0 @@ -import unittest - -from fsspark.config.context import init_spark, stop_spark_session -from fsspark.fs.core import FSDataFrame -from fsspark.utils.datasets import get_tnbc_data_path -from fsspark.utils.io import import_table_as_psdf - - -class FSDataFrameTest(unittest.TestCase): - """ - Define testing methods for FSDataFrame class. - """ - - def setUp(self) -> None: - init_spark(apply_pyarrow_settings=True, - apply_extra_spark_settings=True, - apply_pandas_settings=True) - - def tearDown(self) -> None: - stop_spark_session() - - @staticmethod - def import_FSDataFrame(): - df = import_table_as_psdf(get_tnbc_data_path(), n_partitions=5) - fsdf = FSDataFrame(df, sample_col='Sample', label_col='label') - return fsdf - - def test_FSDataFrame(self): - """ - Test FSDataFrame class. - :return: None - """ - - # create object of type FSDataFrame - fsdf = self.import_FSDataFrame() - - self.assertEqual(len(fsdf.get_features_names()), 500) - self.assertEqual(len(fsdf.get_sample_label()), 44) - - def test_get_sdf_vector(self): - """ - Test get_sdf_vector method. - :return: None - """ - - fsdf = self.import_FSDataFrame() - - sdf = fsdf.get_sdf_vector(output_column_vector='features') - sdf.show(5) - self.assertEqual(len(sdf.columns), 4) - - def test_scale_features(self): - """ - Test scale_features method. - :return: None - """ - - fsdf = self.import_FSDataFrame() - fsdf = fsdf.scale_features(scaler_method='min_max') - - fsdf.get_sdf().show(10) - self.assertGreaterEqual(min(fsdf.to_psdf()['tr|E9PBJ4'].to_numpy()), 0.0) - self.assertLessEqual(max(fsdf.to_psdf()['tr|E9PBJ4'].to_numpy()), 1.0) - - def test_get_features_indices(self): - """ - Test get_features_indices method. - :return: None - """ - - fsdf = self.import_FSDataFrame() - feature_indices = fsdf.get_features_indexed() - feature_names = feature_indices.loc[[0, 1, 2, 5]].tolist() - - self.assertTrue(all([x in ['tr|E9PBJ4', 'sp|P07437', 'sp|P68371', 'tr|F8VWX4'] for x in feature_names])) - - -if __name__ == '__main__': - unittest.main() diff --git a/fsspark/tests/test_data_preprocessing.py b/fsspark/tests/test_data_preprocessing.py deleted file mode 100644 index 85a6e37..0000000 --- a/fsspark/tests/test_data_preprocessing.py +++ /dev/null @@ -1,79 +0,0 @@ -import unittest - -import numpy as np - -from fsspark.config.context import init_spark, stop_spark_session -from fsspark.fs.core import FSDataFrame -from fsspark.fs.utils import compute_missingness_rate, remove_features_by_missingness_rate, impute_missing -from fsspark.utils.datasets import get_tnbc_data_missing_values_path -from fsspark.utils.io import import_table_as_psdf - - -class TestDataPreprocessing(unittest.TestCase): - """ - Define testing methods for data preprocessing (e.g, scaling, imputation, etc.) - - """ - - def setUp(self) -> None: - init_spark(apply_pyarrow_settings=True, - apply_extra_spark_settings=True, - apply_pandas_settings=True) - - def tearDown(self) -> None: - stop_spark_session() - - @staticmethod - def import_FSDataFrame() -> FSDataFrame: - """ - Import FSDataFrame object with missing values. - Number of samples: 44 - Number of features: 10 (5 with missing values) - :return: - """ - df = import_table_as_psdf(get_tnbc_data_missing_values_path(), n_partitions=5) - fsdf = FSDataFrame(df, sample_col='Sample', label_col='label') - return fsdf - - def test_compute_missingness_rate(self): - """ - Test compute_missingness_rate method. - :return: None - """ - - fsdf = self.import_FSDataFrame() - features_missing_rates = compute_missingness_rate(fsdf) - self.assertEqual(features_missing_rates.get('tr|E9PBJ4'), 0.0) - self.assertAlmostEqual(features_missing_rates.get('sp|P07437'), 0.295, places=2) - - def test_filter_by_missingness_rate(self): - """ - Test filter_missingness_rate method. - :return: None - """ - - fsdf = self.import_FSDataFrame() - fsdf = remove_features_by_missingness_rate(fsdf, threshold=0.15) - # print number of features - print(f"Number of remaining features: {fsdf.count_features()}") - - self.assertEqual(fsdf.count_features(), 6) - - def test_impute_missing(self): - """ - Test impute_missing method. Impute missing values using the mean across columns. - :return: None - """ - - fsdf = self.import_FSDataFrame() - fsdf = impute_missing(fsdf, strategy='mean') - - # Collect features as array - array = fsdf._collect_features_as_array() - - # Check if there are no missing (NaNs) or null values - self.assertFalse(np.isnan(array).any()) - - -if __name__ == '__main__': - unittest.main() diff --git a/fsspark/tests/test_fs_pipeline.py b/fsspark/tests/test_fs_pipeline.py deleted file mode 100644 index a1f6b5e..0000000 --- a/fsspark/tests/test_fs_pipeline.py +++ /dev/null @@ -1,71 +0,0 @@ -import unittest - -from fsspark.config.context import init_spark, stop_spark_session -from fsspark.fs.core import FSDataFrame -from fsspark.fs.methods import FSPipeline, FSUnivariate, FSMultivariate, FSMLMethod -from fsspark.utils.datasets import get_tnbc_data_path -from fsspark.utils.io import import_table_as_psdf - - -class FeatureSelectionPipelineTest(unittest.TestCase): - - def setUp(self) -> None: - init_spark(apply_pyarrow_settings=True, - apply_extra_spark_settings=True, - apply_pandas_settings=True) - - def tearDown(self) -> None: - stop_spark_session() - - @staticmethod - def import_FSDataFrame(): - df = import_table_as_psdf(get_tnbc_data_path(), n_partitions=5) - fsdf = FSDataFrame(df, sample_col='Sample', label_col='label') - return fsdf - - def test_feature_selection_pipeline(self): - fsdf = self.import_FSDataFrame() - - training_data, testing_data = fsdf.split_df(split_training_factor=0.6) - - # create a Univariate object - univariate = FSUnivariate( - fs_method='anova', - selection_mode='percentile', - selection_threshold=0.8) - - # create a Multivariate object - multivariate = FSMultivariate( - fs_method='m_corr', - corr_threshold=0.75, - corr_method="pearson" - ) - - # create a MLMethod object - rf_classifier = FSMLMethod( - fs_method='rf_multilabel', - rfe=True, - rfe_iterations=2, - percent_to_keep=0.9, - estimator_params={'labelCol': 'label'}, - evaluator_params={'metricName': 'accuracy'}, - grid_params={'numTrees': [10, 15], 'maxDepth': [5, 10]}, - cv_params={'parallelism': 2, 'numFolds': 5} - ) - - # create a pipeline object - fs_pipeline = FSPipeline(df_training=training_data, - df_testing=testing_data, - fs_stages=[univariate, multivariate, rf_classifier]) - - # run the pipeline - results = fs_pipeline.run() - - # print results - print(results) - - assert results.get('training_metric') > 0.9 - - -if __name__ == '__main__': - unittest.main() diff --git a/fsspark/tests/test_import_export.py b/fsspark/tests/test_import_export.py deleted file mode 100644 index 57b0c5b..0000000 --- a/fsspark/tests/test_import_export.py +++ /dev/null @@ -1,45 +0,0 @@ -import unittest - -import pyspark -import pyspark.pandas as ps - -from fsspark.config.context import init_spark, stop_spark_session -from fsspark.utils.datasets import get_tnbc_data_path -from fsspark.utils.io import import_table, import_table_as_psdf - - -class TestImportExport(unittest.TestCase): - - def setUp(self) -> None: - init_spark(apply_pyarrow_settings=True, - apply_extra_spark_settings=True, - apply_pandas_settings=True) - - def tearDown(self) -> None: - stop_spark_session() - - def test_import_tsv(self): - """ - Test import tsv file as Spark DataFrame. - :return: None - """ - df = import_table(path=get_tnbc_data_path(), - n_partitions=10) - - self.assertIsInstance(df, pyspark.sql.DataFrame) - self.assertEqual(df.count(), 44) - - def test_import_tsv_as_psdf(self): - """ - Test import tsv file as Pandas on Spark DataFrame (PoS). - :return: None - """ - df = import_table_as_psdf(path=get_tnbc_data_path(), - n_partitions=10) - - self.assertIsInstance(df, ps.frame.DataFrame) - self.assertEqual(df.shape, (44, 502)) - - -if __name__ == '__main__': - unittest.main() diff --git a/fsspark/tests/test_ml_methods.py b/fsspark/tests/test_ml_methods.py deleted file mode 100644 index 3dc4bda..0000000 --- a/fsspark/tests/test_ml_methods.py +++ /dev/null @@ -1,180 +0,0 @@ -import unittest - -from pyspark.ml.classification import (RandomForestClassifier, - LogisticRegression) -from pyspark.ml.evaluation import (BinaryClassificationEvaluator, - MulticlassClassificationEvaluator) - -from fsspark.config.context import init_spark, stop_spark_session -from fsspark.fs.core import FSDataFrame -from fsspark.fs.ml import MLCVModel -from fsspark.utils.datasets import get_tnbc_data_path -from fsspark.utils.io import import_table_as_psdf - - -class MLMethodTest(unittest.TestCase): - - def setUp(self) -> None: - init_spark(apply_pyarrow_settings=True, - apply_extra_spark_settings=True, - apply_pandas_settings=True) - - def tearDown(self) -> None: - stop_spark_session() - - @staticmethod - def import_FSDataFrame(): - df = import_table_as_psdf(get_tnbc_data_path(), n_partitions=5) - fsdf = FSDataFrame(df, sample_col='Sample', label_col='label') - return fsdf - - def test_build_model_using_cross_validator(self): - fsdf = self.import_FSDataFrame() - estimator = RandomForestClassifier() - evaluator = BinaryClassificationEvaluator() - grid_params = {'numTrees': [10, 20, 30], 'maxDepth': [5, 10, 15]} - ml_method = MLCVModel( - estimator=estimator, - evaluator=evaluator, - estimator_params=None, - grid_params=None, - cv_params=None - ) - - print(ml_method._cross_validator.__str__()) - assert ml_method._cross_validator is not None - - def test_get_feature_scores_random_forest_classifier(self): - # Create a sample FSDataFrame - fsdf = self.import_FSDataFrame() - - # Create a RandomForestClassifier model - estimator = RandomForestClassifier() - evaluator = MulticlassClassificationEvaluator() - estimator_params = {'labelCol': 'label'} - grid_params = {'numTrees': [10, 20, 30], 'maxDepth': [5, 10, 15]} - cv_params = {'parallelism': 2, 'numFolds': 5, 'collectSubModels': False} - - ml_method = MLCVModel( - estimator=estimator, - evaluator=evaluator, - estimator_params=estimator_params, - grid_params=grid_params, - cv_params=cv_params - ) - - (ml_method - .fit(fsdf) - ) - - # Get the feature scores - feature_scores = ml_method.get_feature_scores() - - # Assert that the feature scores DataFrame is not empty - assert not feature_scores.empty - - # Assert that the feature scores DataFrame has the expected columns - expected_columns = ['features', 'feature_index', 'scores', 'percentile_rank'] - assert list(feature_scores.columns) == expected_columns - - # check if dataframe is sorted by scores (descending) - assert feature_scores['scores'].is_monotonic_decreasing - - print(feature_scores) - - def test_multilabel_rf_model(self): - fsdf = self.import_FSDataFrame() - training_data, testing_data = fsdf.split_df(split_training_factor=0.8) - - estimator = RandomForestClassifier() - evaluator = MulticlassClassificationEvaluator(metricName='accuracy') - estimator_params = {'labelCol': 'label'} - grid_params = {'numTrees': [5, 10], 'maxDepth': [3, 5]} - cv_params = {'parallelism': 2, 'numFolds': 3} - - ml_method = MLCVModel( - estimator=estimator, - evaluator=evaluator, - estimator_params=estimator_params, - grid_params=grid_params, - cv_params=cv_params - ) - - (ml_method - .fit(training_data) - ) - - # get the accuracy on training - eval_training = ml_method.get_eval_metric_on_training() - print(f"Accuracy on training data: {eval_training}") - - # get the accuracy on testing - testing_acc = ml_method.get_eval_metric_on_testing(testing_data) - print(f"Accuracy on test data: {testing_acc}") - assert testing_acc > 0.7 - - def test_multilabel_lr_model(self): - fsdf = self.import_FSDataFrame() - training_data, testing_data = fsdf.split_df(split_training_factor=0.6) - - estimator = LogisticRegression() - evaluator = MulticlassClassificationEvaluator(metricName='accuracy') - estimator_params = {'labelCol': 'label'} - grid_params = {'regParam': [0.1, 0.01]} - cv_params = {'parallelism': 2, 'numFolds': 3} - - ml_method = MLCVModel( - estimator=estimator, - evaluator=evaluator, - estimator_params=estimator_params, - grid_params=grid_params, - cv_params=cv_params - ) - - (ml_method - .fit(training_data) - ) - - # get the accuracy on training - eval_training = ml_method.get_eval_metric_on_training() - print(f"Accuracy on training data: {eval_training}") - - # get the accuracy on testing - testing_acc = ml_method.get_eval_metric_on_testing(testing_data) - print(f"Accuracy on test data: {testing_acc}") - assert testing_acc > 0.7 - - def test_FSMLMethod(self): - from fsspark.fs.methods import FSMLMethod - - fsdf = self.import_FSDataFrame() - training_data, testing_data = fsdf.split_df(split_training_factor=0.7) - - estimator_params = {'labelCol': 'label'} - grid_params = {'numTrees': [5, 10], 'maxDepth': [3, 5]} - cv_params = {'parallelism': 2, 'numFolds': 3} - - ml_method = FSMLMethod( - fs_method='rf_multilabel', - rfe=True, - rfe_iterations=2, - percent_to_keep=0.9, - estimator_params=estimator_params, - evaluator_params={'metricName': 'accuracy'}, - grid_params=grid_params, - cv_params=cv_params - ) - - filtered_fsdf = ml_method.select_features(training_data) - - training_acc = ml_method.get_eval_metric_on_training() - print(f"Training accuracy: {training_acc}") - assert training_acc > 0.8 - - testing_acc = ml_method.get_eval_metric_on_testing(testing_data) - print(f"Testing accuracy: {testing_acc}") - assert testing_acc > 0.7 - - -if __name__ == '__main__': - unittest.main() diff --git a/fsspark/utils/__init__.py b/fsspark/utils/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/fsspark/utils/io.py b/fsspark/utils/io.py deleted file mode 100644 index 951c5cb..0000000 --- a/fsspark/utils/io.py +++ /dev/null @@ -1,92 +0,0 @@ -import warnings - -import pyspark.pandas -import pyspark.sql - -from fsspark.config.context import PANDAS_ON_SPARK_API_SETTINGS - -warnings.filterwarnings("ignore") - - -def import_table(path: str, - header: bool = True, - sep: str = "\t", - n_partitions: int = 5) -> pyspark.sql.DataFrame: - """ - Import tsv file as Spark DataFrame. - - :param path: File path - :param header: True if the first row is header. - :param sep: Column separator - :param n_partitions: Minimal number of partitions - - :return: Spark DataFrame - """ - - _sc = pyspark.sql.SparkSession.getActiveSession() - - if _sc is None: - raise ValueError("Active Spark Session not found...") - - sdf = (_sc - .read - .option("delimiter", sep) - .option("header", header) - .option("inferSchema", "true") - .csv(path) - .repartition(n_partitions) - ) - return sdf - - -def import_parquet(path: str, - header: bool = True) -> pyspark.sql.DataFrame: - """ - Import parquet file as Spark DataFrame. - - :param path: File path - :param header: True if the first row is header. - - :return: Spark DataFrame - """ - - _sc = pyspark.sql.SparkSession.getActiveSession() - - if _sc is None: - raise ValueError("Active Spark Session not found...") - - sdf = (_sc - .read - .option("header", header) - .option("inferSchema", "true") - .parquet(path) - ) - return sdf - - -def import_table_as_psdf(path: str, - sep: str = "\t", - n_partitions: int = 5) -> pyspark.pandas.DataFrame: - """ - Import tsv file as Pandas on Spark DataFrame - - :param path: Path to TSV file - :param sep: Column separator (default: "\t") - :param n_partitions: Minimal number of partitions - - :return: Pandas on Spark DataFrame - """ - - import pyspark.pandas as ps - - # apply settings for pandas on spark api - [ps.set_option(k, PANDAS_ON_SPARK_API_SETTINGS.get(k)) - for k in PANDAS_ON_SPARK_API_SETTINGS.keys()] - - psdf = (ps - .read_csv(path, - sep=sep) - .spark - .repartition(n_partitions) - ) - return psdf diff --git a/requirements.txt b/requirements.txt index de6efc3..75bbbe8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,10 @@ -pyspark~=3.3.0 -networkx~=2.8.7 -numpy~=1.23.4 -setuptools~=65.5.0 -pandas~=1.5.1 -pyarrow~=8.0.0 \ No newline at end of file +networkx +numpy +setuptools +pandas +scikit-learn +scipy +psutil +pytest +matplotlib +memory-profiler \ No newline at end of file diff --git a/setup.py b/setup.py index 7beeec5..63593d3 100644 --- a/setup.py +++ b/setup.py @@ -4,23 +4,26 @@ long_description = fh.read() setup( - name='fsspark', - version='0.0.1', - url='https://github.com/bigbio/fsspark', - license='Apache-2.0', - author='Enrique Audain Martinez', - author_email='enrique.audain@gmail.com', - description='Feature selection in Spark', + name="fslite", + version="0.0.1", + url="https://github.com/bigbio/fslite", + license="Apache-2.0", + author="Enrique Audain Martinez", + author_email="enrique.audain@gmail.com", + description="Feature selection in Spark", long_description=long_description, long_description_content_type="text/markdown", packages=find_packages(), install_requires=[ - "pyspark", "numpy", "networkx", "setuptools", "pandas", - "pyarrow" + "pyarrow", + "scikit-learn", + "scipy", + "psutil", + "matplotlib", ], classifiers=[ # Classifiers for your package @@ -28,5 +31,5 @@ "License :: OSI Approved :: MIT License", "Operating System :: POSIX :: Linux", ], - python_requires='>=3.9.0', + python_requires=">=3.9.0", )