diff --git a/README.md b/README.md index 574f880..f8d3e5c 100644 --- a/README.md +++ b/README.md @@ -41,4 +41,4 @@ pip install . -r requirements.txt ### Maintainers - Enrique Audain (https://github.com/enriquea) -- Yasset Perez-Riverol (https://github.com/ypriverol +- Yasset Perez-Riverol (https://github.com/ypriverol) diff --git a/docs/README.data.md b/docs/README.data.md index c486461..e812609 100644 --- a/docs/README.data.md +++ b/docs/README.data.md @@ -9,7 +9,7 @@ Here we describe the main data structures used in `fsspark` and how to use them. The current module support as input data a headed Tab-separated values (TSV) file with `S x 2+F` dimensions, where `S` is the number of samples (rows) and `F` is the number of features (columns). The first column of the file -is expected to contain the `sample IDs`, the second column the `response variable` and the remaining +is expected to contain the `sample IDs`, the second column the `sample label` and the remaining columns the `features`. The response variable can be either binary, categorical or continuous; and should be encoded as `0` and `1` for binary variables, as integers for categorical variables and as floats for continuous variables. @@ -20,12 +20,12 @@ The following is an example of a TSV file with a binary response variable: ``` ------------------------------------------------------------------------ -| sample_id | response | feature_1 | feature_2 | feature_3 | feature_4 | +| sample_id | label | feature_1 | feature_2 | feature_3 | feature_4 | ------------------------------------------------------------------------ -| sample_1 | 0 | 0.1 | 0.2 | 0.3 | 0.4 | -| sample_2 | 1 | 0.5 | 0.6 | 0.7 | 0.8 | -| sample_3 | 0 | 0.9 | 0.10 | 0.11 | 0.12 | -| sample_4 | 1 | 0.13 | 0.14 | 0.15 | 0.16 | +| sample_1 | a | 0.1 | 0.2 | 0.3 | 0.4 | +| sample_2 | b | 0.5 | 0.6 | 0.7 | 0.8 | +| sample_3 | b | 0.9 | 0.10 | 0.11 | 0.12 | +| sample_4 | c | 0.13 | 0.14 | 0.15 | 0.16 | ------------------------------------------------------------------------ ``` diff --git a/docs/README.methods.md b/docs/README.methods.md index 7b9caa2..0c2c977 100644 --- a/docs/README.methods.md +++ b/docs/README.methods.md @@ -69,7 +69,7 @@ from fsspark.fs.core import FSDataFrame from fsspark.fs.ml import cv_rf_classification, get_accuracy, get_predictions from fsspark.fs.multivariate import multivariate_filter from fsspark.fs.univariate import univariate_filter -from fsspark.fs.utils import (filter_missingness_rate, +from fsspark.fs.utils import (remove_features_by_missingness_rate, impute_missing) from fsspark.utils.datasets import get_tnbc_data_path from fsspark.utils.io import import_table_as_psdf @@ -86,8 +86,7 @@ fsdf = FSDataFrame(fsdf, sample_col='Sample', label_col='label') # Step 1. Data pre-processing. # a) Filter missingness rate -fsdf = filter_missingness_rate(fsdf, - threshold=0.1) +fsdf = remove_features_by_missingness_rate(fsdf, threshold=0.1) # b) Impute data frame fsdf = impute_missing(fsdf) diff --git a/fsspark/fs/core.py b/fsspark/fs/core.py index 8fc16eb..b9e86aa 100644 --- a/fsspark/fs/core.py +++ b/fsspark/fs/core.py @@ -1,4 +1,5 @@ import logging +import numpy as np from typing import (Union, Optional, List, @@ -203,6 +204,18 @@ def get_sdf_vector(self, output_column_vector: str = 'features') -> pyspark.sql. return sdf_vector + def _collect_features_as_array(self) -> np.array: + """ + Collect features from FSDataFrame as an array. + `Warning`: This method will collect the entire DataFrame into the driver. + Uses this method on small datasets only (e.g., after filtering or splitting the data) + + :return: Numpy array + """ + sdf = self.get_sdf().select(*self.get_features_names()) + a = np.array(sdf.collect()) + return a + def to_psdf(self) -> pyspark.pandas.DataFrame: """ Convert Spark DataFrame to Pandas on Spark DataFrame diff --git a/fsspark/fs/ml.py b/fsspark/fs/ml.py index 8f861eb..9f372de 100644 --- a/fsspark/fs/ml.py +++ b/fsspark/fs/ml.py @@ -19,10 +19,13 @@ ) from pyspark.ml.regression import RandomForestRegressor, FMRegressor from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, CrossValidatorModel +from pyspark.pandas import DataFrame from fsspark.fs.core import FSDataFrame +from fsspark.utils.generic import tag +@tag("spark implementation") def cv_rf_classification( fsdf: FSDataFrame, binary_classification: bool = True ) -> CrossValidatorModel: @@ -34,7 +37,6 @@ def cv_rf_classification( Otherwise, implement a multi-class classification problem. :return: CrossValidatorModel - TODO: Consider here if make sense to return the full CV Model. """ features_col = "features" sdf = fsdf.get_sdf_vector(output_column_vector=features_col) @@ -69,6 +71,7 @@ def cv_rf_classification( return cv_model +@tag("spark implementation") def cv_svc_classification( fsdf: FSDataFrame, ) -> CrossValidatorModel: @@ -79,7 +82,6 @@ def cv_svc_classification( :param fsdf: FSDataFrame :return: CrossValidatorModel - TODO: Consider here if make sense to return the full CV Model. """ features_col = "features" @@ -108,6 +110,7 @@ def cv_svc_classification( return cv_model +@tag("spark implementation") def cv_rf_regression(fsdf: FSDataFrame) -> CrossValidatorModel: """ Cross-validation with Random Forest regressor as estimator. @@ -116,7 +119,6 @@ def cv_rf_regression(fsdf: FSDataFrame) -> CrossValidatorModel: :param fsdf: FSDataFrame :return: CrossValidatorModel - TODO: Consider here if make sense to return the full CV Model. """ features_col = "features" @@ -148,6 +150,7 @@ def cv_rf_regression(fsdf: FSDataFrame) -> CrossValidatorModel: return cv_model +@tag("spark implementation") def cv_fm_regression(fsdf: FSDataFrame) -> CrossValidatorModel: """ Cross-validation with Factorization Machines as estimator. @@ -156,7 +159,6 @@ def cv_fm_regression(fsdf: FSDataFrame) -> CrossValidatorModel: :param fsdf: FSDataFrame :return: CrossValidatorModel - TODO: Do it make sense here to return the full CV Model?? """ features_col = "features" @@ -184,12 +186,14 @@ def cv_fm_regression(fsdf: FSDataFrame) -> CrossValidatorModel: def get_accuracy(model: CrossValidatorModel) -> float: """ + Get accuracy from a trained CrossValidatorModel (best model). # TODO: This function should be able to parse all available models. Currently only support RandomForestClassificationModel. :param model: Trained CrossValidatorModel - :return: Training accuracy + :return: accuracy """ + best_model = model.bestModel if isinstance(best_model, RandomForestClassificationModel): acc = best_model.summary.accuracy @@ -200,7 +204,7 @@ def get_accuracy(model: CrossValidatorModel) -> float: return acc -def get_predictions(model: CrossValidatorModel) -> pyspark.sql.DataFrame: +def get_predictions(model: CrossValidatorModel) -> pyspark.pandas.DataFrame: """ # TODO: This function should be able to parse all available models. Currently only support RandomForestClassificationModel. @@ -219,11 +223,11 @@ def get_predictions(model: CrossValidatorModel) -> pyspark.sql.DataFrame: ) else: pred = None - return pred + return pred.pandas_api() def get_feature_scores(model: CrossValidatorModel, - indexed_features: pyspark.pandas.series.Series = None) -> pd.DataFrame: + indexed_features: pyspark.pandas.series.Series = None) -> pyspark.pandas.DataFrame: """ Extract features scores (e.g. importance or coefficients) from a trained CrossValidatorModel. @@ -234,7 +238,7 @@ def get_feature_scores(model: CrossValidatorModel, :param indexed_features: If provided, report features names rather than features indices. Usually, the output from `training_data.get_features_indexed()`. - :return: Pandas on DataFrame with feature importance + :return: Pandas DataFrame with feature importance """ df_features = (None if indexed_features is None @@ -279,5 +283,5 @@ def get_feature_scores(model: CrossValidatorModel, return df.sort_values(by="coefficients", ascending=False) else: - df = None # this should follow with parsing options for the different models. - return df + # TODO: here we should support other models. + pass diff --git a/fsspark/fs/multivariate.py b/fsspark/fs/multivariate.py index e0e34ea..c82f67b 100644 --- a/fsspark/fs/multivariate.py +++ b/fsspark/fs/multivariate.py @@ -8,18 +8,19 @@ from fsspark.fs.core import FSDataFrame from fsspark.fs.utils import find_maximal_independent_set +from fsspark.utils.generic import tag logging.basicConfig(format="%(levelname)s (%(name)s %(lineno)s): %(message)s") logger = logging.getLogger("FSSPARK:MULTIVARIATE") logger.setLevel(logging.INFO) +@tag("experimental") def _compute_correlation_matrix(sdf: pyspark.sql.DataFrame, features_col: str = 'features', method: str = "pearson") -> np.ndarray: """ Compute features Matrix Correlation. - TODO: Warning: Computed matrix correlation will collected into the drive with this implementation. :param sdf: Spark DataFrame :param features_col: Name of the feature column vector name. @@ -27,6 +28,11 @@ def _compute_correlation_matrix(sdf: pyspark.sql.DataFrame, :return: Numpy array. """ + + logger.warning("Warning: Computed matrix correlation will be collected into the drive with this implementation.\n" + "This may cause memory issues. Use it preferably with small datasets.") + logger.info(f"Computing correlation matrix using {method} method.") + mcorr = (Correlation .corr(sdf, features_col, method) .collect()[0][0] @@ -35,6 +41,7 @@ def _compute_correlation_matrix(sdf: pyspark.sql.DataFrame, return mcorr +@tag("experimental") def multivariate_correlation_selector(fsdf: FSDataFrame, strict: bool = True, corr_threshold: float = 0.75, @@ -43,9 +50,9 @@ def multivariate_correlation_selector(fsdf: FSDataFrame, Compute the correlation matrix (Pearson) among input features and select those below a specified threshold. :param fsdf: Input FSDataFrame - :param strict: If True (default), apply hard filtering (strict) to remove highly related features. + :param strict: If True (default), apply hard filtering (strict) to remove highly correlated features. Otherwise, find the maximal independent set of highly correlated features (approximate method). - The approximate method is experimental. + `Warning`: The approximate method is experimental. :param corr_threshold: Minimal correlation threshold to consider two features correlated. :param method: One of `pearson` (default) or `spearman`. @@ -84,6 +91,7 @@ def multivariate_correlation_selector(fsdf: FSDataFrame, return selected_features +@tag("spark implementation") def multivariate_variance_selector(fsdf: FSDataFrame, variance_threshold: float = 0.0) -> List[str]: """ diff --git a/fsspark/fs/univariate.py b/fsspark/fs/univariate.py index e921208..56a21df 100644 --- a/fsspark/fs/univariate.py +++ b/fsspark/fs/univariate.py @@ -5,15 +5,17 @@ from pyspark.ml.feature import UnivariateFeatureSelector from fsspark.fs.core import FSDataFrame +from fsspark.utils.generic import tag logging.basicConfig(format="%(levelname)s (%(name)s %(lineno)s): %(message)s") logger = logging.getLogger("FSSPARK:UNIVARIATE") logger.setLevel(logging.INFO) +@tag("spark implementation") def compute_univariate_corr(fsdf: FSDataFrame) -> Dict[str, float]: """ - Compute the correlation coefficient between every column (features) in the input DataFrame and a defined target. + Compute the correlation coefficient between every column (features) in the input DataFrame and the label (class). :param fsdf: Input FSDataFrame @@ -32,7 +34,7 @@ def compute_univariate_corr(fsdf: FSDataFrame) -> Dict[str, float]: def univariate_correlation_selector(fsdf: FSDataFrame, corr_threshold: float = 0.3) -> List[str]: """ - Select features based on its correlation with a target label, if corr value is less than a specified threshold. + Select features based on its correlation with a label (class), if corr value is less than a specified threshold. Expected both features and label to be of type numeric. :param fsdf: FSDataFrame @@ -46,6 +48,7 @@ def univariate_correlation_selector(fsdf: FSDataFrame, return selected_features +@tag("spark implementation") def univariate_selector(fsdf: FSDataFrame, label_type: str = 'categorical', **kwargs) -> List[str]: @@ -63,7 +66,7 @@ def univariate_selector(fsdf: FSDataFrame, """ vector_col_name = 'features' - vsdf = fsdf.get_sdf_vector(output_column_vector=vector_col_name) + sdf = fsdf.get_sdf_vector(output_column_vector=vector_col_name) label = fsdf.get_label_col_name() # set selector @@ -85,13 +88,14 @@ def univariate_selector(fsdf: FSDataFrame, .setLabelCol(label) ) - model = selector.fit(vsdf) + model = selector.fit(sdf) selected_features_indices = model.selectedFeatures selected_features = fsdf.get_features_by_index(selected_features_indices) return selected_features +@tag("spark implementation") def univariate_filter(fsdf: FSDataFrame, univariate_method: str = 'u_corr', **kwargs) -> FSDataFrame: @@ -100,7 +104,8 @@ def univariate_filter(fsdf: FSDataFrame, :param fsdf: Input FSDataFrame :param univariate_method: Univariate selector method. - Possible values are 'u_corr', 'anova' or 'f_regression'. + Possible values are 'u_corr', 'anova' (categorical label) + or 'f_regression' (continuous label). :return: Filtered FSDataFrame """ diff --git a/fsspark/fs/utils.py b/fsspark/fs/utils.py index 9931649..31d4c39 100644 --- a/fsspark/fs/utils.py +++ b/fsspark/fs/utils.py @@ -7,12 +7,14 @@ from pyspark.ml.feature import Imputer from fsspark.fs.core import FSDataFrame +from fsspark.utils.generic import tag logging.basicConfig(format="%(levelname)s (%(name)s %(lineno)s): %(message)s") logger = logging.getLogger("FSSPARK:UTILS") logger.setLevel(logging.INFO) +@tag("spark implementation") def compute_missingness_rate(fsdf: FSDataFrame) -> Dict[str, float]: """ Compute per features missingness rate. @@ -28,7 +30,7 @@ def compute_missingness_rate(fsdf: FSDataFrame) -> Dict[str, float]: missing_rates = sdf.select( [ ( - f.sum(f.when(f.isnan(sdf[c]) | f.isnull(sdf[c]), 1).otherwise(0)) / n_instances + f.sum(f.when(f.isnan(sdf[c]) | f.isnull(sdf[c]), 1).otherwise(0)) / n_instances ).alias(c) for c in features ] @@ -37,15 +39,15 @@ def compute_missingness_rate(fsdf: FSDataFrame) -> Dict[str, float]: return missing_rates.first().asDict() -def filter_missingness_rate( - fsdf: FSDataFrame, threshold: float = 0.15 +def remove_features_by_missingness_rate( + fsdf: FSDataFrame, threshold: float = 0.15 ) -> FSDataFrame: """ Remove features from FSDataFrame with missingness rate higher or equal than a specified threshold. :param fsdf: FSDataFrame. :param threshold: maximal missingness rate allowed to keep a feature. - :return: + :return: FSDataFrame with removed features. """ d_rates = compute_missingness_rate(fsdf) features_to_remove = [k for k in d_rates.keys() if d_rates.get(k) >= threshold] @@ -57,6 +59,7 @@ def filter_missingness_rate( return fsdf_filtered +@tag("spark implementation") def impute_missing(fsdf: FSDataFrame, strategy: str = "mean") -> FSDataFrame: """ Impute missing values using the mean, median or mode. @@ -93,14 +96,16 @@ def impute_missing(fsdf: FSDataFrame, strategy: str = "mean") -> FSDataFrame: ) +@tag("experimental") def find_maximal_independent_set(pairs: Tuple[int], keep: bool = True) -> Set[int]: """ Given a set of indices pairs, returns a random maximal independent set. - :param pairs: + :param pairs: Set of indices pairs. :param keep: If true (default), return the maximal independent set. Otherwise, return the remaining indices after removing the maximal independent set. - :return: + + :return: Set of indices (maximal independent set or remaining indices). """ logger.warning("This method is experimental and have been not extensively tested...") diff --git a/fsspark/pipeline/fs_corr_rf.py b/fsspark/pipeline/fs_corr_rf.py index 1d6f369..feea1b8 100644 --- a/fsspark/pipeline/fs_corr_rf.py +++ b/fsspark/pipeline/fs_corr_rf.py @@ -11,7 +11,7 @@ from fsspark.fs.ml import cv_rf_classification, get_accuracy, get_predictions, get_feature_scores from fsspark.fs.multivariate import multivariate_filter from fsspark.fs.univariate import univariate_filter -from fsspark.fs.utils import filter_missingness_rate, impute_missing +from fsspark.fs.utils import remove_features_by_missingness_rate, impute_missing from fsspark.utils.datasets import get_tnbc_data_path from fsspark.utils.io import import_table_as_psdf @@ -26,7 +26,7 @@ # Step 1. Data pre-processing. # a) Filter missingness rate -fsdf = filter_missingness_rate(fsdf, threshold=0.1) +fsdf = remove_features_by_missingness_rate(fsdf, threshold=0.1) # b) Impute data frame fsdf = impute_missing(fsdf) diff --git a/fsspark/tests/test_data_preprocessing.py b/fsspark/tests/test_data_preprocessing.py new file mode 100644 index 0000000..85a6e37 --- /dev/null +++ b/fsspark/tests/test_data_preprocessing.py @@ -0,0 +1,79 @@ +import unittest + +import numpy as np + +from fsspark.config.context import init_spark, stop_spark_session +from fsspark.fs.core import FSDataFrame +from fsspark.fs.utils import compute_missingness_rate, remove_features_by_missingness_rate, impute_missing +from fsspark.utils.datasets import get_tnbc_data_missing_values_path +from fsspark.utils.io import import_table_as_psdf + + +class TestDataPreprocessing(unittest.TestCase): + """ + Define testing methods for data preprocessing (e.g, scaling, imputation, etc.) + + """ + + def setUp(self) -> None: + init_spark(apply_pyarrow_settings=True, + apply_extra_spark_settings=True, + apply_pandas_settings=True) + + def tearDown(self) -> None: + stop_spark_session() + + @staticmethod + def import_FSDataFrame() -> FSDataFrame: + """ + Import FSDataFrame object with missing values. + Number of samples: 44 + Number of features: 10 (5 with missing values) + :return: + """ + df = import_table_as_psdf(get_tnbc_data_missing_values_path(), n_partitions=5) + fsdf = FSDataFrame(df, sample_col='Sample', label_col='label') + return fsdf + + def test_compute_missingness_rate(self): + """ + Test compute_missingness_rate method. + :return: None + """ + + fsdf = self.import_FSDataFrame() + features_missing_rates = compute_missingness_rate(fsdf) + self.assertEqual(features_missing_rates.get('tr|E9PBJ4'), 0.0) + self.assertAlmostEqual(features_missing_rates.get('sp|P07437'), 0.295, places=2) + + def test_filter_by_missingness_rate(self): + """ + Test filter_missingness_rate method. + :return: None + """ + + fsdf = self.import_FSDataFrame() + fsdf = remove_features_by_missingness_rate(fsdf, threshold=0.15) + # print number of features + print(f"Number of remaining features: {fsdf.count_features()}") + + self.assertEqual(fsdf.count_features(), 6) + + def test_impute_missing(self): + """ + Test impute_missing method. Impute missing values using the mean across columns. + :return: None + """ + + fsdf = self.import_FSDataFrame() + fsdf = impute_missing(fsdf, strategy='mean') + + # Collect features as array + array = fsdf._collect_features_as_array() + + # Check if there are no missing (NaNs) or null values + self.assertFalse(np.isnan(array).any()) + + +if __name__ == '__main__': + unittest.main() diff --git a/fsspark/utils/generic.py b/fsspark/utils/generic.py new file mode 100644 index 0000000..5674998 --- /dev/null +++ b/fsspark/utils/generic.py @@ -0,0 +1,20 @@ +""" +A set of generic functions that are used in the project. +""" + + +# Generic decorator to label a function with a specified tag. +def tag(label: str): + """ + Decorator to tag a function with a specified tag (e.g., experimental). + + :param label: tag label + :return: decorator + """ + def decorator(func): + def wrapper(*args, **kwargs): + print(f"Tag for {func.__name__}: {label}") + return func(*args, **kwargs) + + return wrapper + return decorator diff --git a/images/FSDF_structure.png b/images/FSDF_structure.png index 19d7ed2..715a448 100644 Binary files a/images/FSDF_structure.png and b/images/FSDF_structure.png differ