Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PR response to issue #8 #9

Merged
merged 8 commits into from
Jan 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,4 @@ pip install . -r requirements.txt

### Maintainers
- Enrique Audain (https://github.com/enriquea)
- Yasset Perez-Riverol (https://github.com/ypriverol
- Yasset Perez-Riverol (https://github.com/ypriverol)
12 changes: 6 additions & 6 deletions docs/README.data.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ Here we describe the main data structures used in `fsspark` and how to use them.

The current module support as input data a headed Tab-separated values (TSV) file with `S x 2+F` dimensions,
where `S` is the number of samples (rows) and `F` is the number of features (columns). The first column of the file
is expected to contain the `sample IDs`, the second column the `response variable` and the remaining
is expected to contain the `sample IDs`, the second column the `sample label` and the remaining
columns the `features`. The response variable can be either binary, categorical or continuous; and should
be encoded as `0` and `1` for binary variables, as integers for categorical variables and as floats
for continuous variables.
Expand All @@ -20,12 +20,12 @@ The following is an example of a TSV file with a binary response variable:

```
------------------------------------------------------------------------
| sample_id | response | feature_1 | feature_2 | feature_3 | feature_4 |
| sample_id | label | feature_1 | feature_2 | feature_3 | feature_4 |
------------------------------------------------------------------------
| sample_1 | 0 | 0.1 | 0.2 | 0.3 | 0.4 |
| sample_2 | 1 | 0.5 | 0.6 | 0.7 | 0.8 |
| sample_3 | 0 | 0.9 | 0.10 | 0.11 | 0.12 |
| sample_4 | 1 | 0.13 | 0.14 | 0.15 | 0.16 |
| sample_1 | a | 0.1 | 0.2 | 0.3 | 0.4 |
| sample_2 | b | 0.5 | 0.6 | 0.7 | 0.8 |
| sample_3 | b | 0.9 | 0.10 | 0.11 | 0.12 |
| sample_4 | c | 0.13 | 0.14 | 0.15 | 0.16 |
------------------------------------------------------------------------

```
Expand Down
5 changes: 2 additions & 3 deletions docs/README.methods.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ from fsspark.fs.core import FSDataFrame
from fsspark.fs.ml import cv_rf_classification, get_accuracy, get_predictions
from fsspark.fs.multivariate import multivariate_filter
from fsspark.fs.univariate import univariate_filter
from fsspark.fs.utils import (filter_missingness_rate,
from fsspark.fs.utils import (remove_features_by_missingness_rate,
impute_missing)
from fsspark.utils.datasets import get_tnbc_data_path
from fsspark.utils.io import import_table_as_psdf
Expand All @@ -86,8 +86,7 @@ fsdf = FSDataFrame(fsdf, sample_col='Sample', label_col='label')
# Step 1. Data pre-processing.

# a) Filter missingness rate
fsdf = filter_missingness_rate(fsdf,
threshold=0.1)
fsdf = remove_features_by_missingness_rate(fsdf, threshold=0.1)

# b) Impute data frame
fsdf = impute_missing(fsdf)
Expand Down
13 changes: 13 additions & 0 deletions fsspark/fs/core.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import numpy as np
from typing import (Union,
Optional,
List,
Expand Down Expand Up @@ -203,6 +204,18 @@ def get_sdf_vector(self, output_column_vector: str = 'features') -> pyspark.sql.

return sdf_vector

def _collect_features_as_array(self) -> np.array:
"""
Collect features from FSDataFrame as an array.
`Warning`: This method will collect the entire DataFrame into the driver.
Uses this method on small datasets only (e.g., after filtering or splitting the data)

:return: Numpy array
"""
sdf = self.get_sdf().select(*self.get_features_names())
a = np.array(sdf.collect())
return a

def to_psdf(self) -> pyspark.pandas.DataFrame:
"""
Convert Spark DataFrame to Pandas on Spark DataFrame
Expand Down
26 changes: 15 additions & 11 deletions fsspark/fs/ml.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@
)
from pyspark.ml.regression import RandomForestRegressor, FMRegressor
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, CrossValidatorModel
from pyspark.pandas import DataFrame

from fsspark.fs.core import FSDataFrame
from fsspark.utils.generic import tag


@tag("spark implementation")
def cv_rf_classification(
fsdf: FSDataFrame, binary_classification: bool = True
) -> CrossValidatorModel:
Expand All @@ -34,7 +37,6 @@ def cv_rf_classification(
Otherwise, implement a multi-class classification problem.

:return: CrossValidatorModel
TODO: Consider here if make sense to return the full CV Model.
"""
features_col = "features"
sdf = fsdf.get_sdf_vector(output_column_vector=features_col)
Expand Down Expand Up @@ -69,6 +71,7 @@ def cv_rf_classification(
return cv_model


@tag("spark implementation")
def cv_svc_classification(
fsdf: FSDataFrame,
) -> CrossValidatorModel:
Expand All @@ -79,7 +82,6 @@ def cv_svc_classification(
:param fsdf: FSDataFrame

:return: CrossValidatorModel
TODO: Consider here if make sense to return the full CV Model.
"""

features_col = "features"
Expand Down Expand Up @@ -108,6 +110,7 @@ def cv_svc_classification(
return cv_model


@tag("spark implementation")
def cv_rf_regression(fsdf: FSDataFrame) -> CrossValidatorModel:
"""
Cross-validation with Random Forest regressor as estimator.
Expand All @@ -116,7 +119,6 @@ def cv_rf_regression(fsdf: FSDataFrame) -> CrossValidatorModel:
:param fsdf: FSDataFrame

:return: CrossValidatorModel
TODO: Consider here if make sense to return the full CV Model.
"""

features_col = "features"
Expand Down Expand Up @@ -148,6 +150,7 @@ def cv_rf_regression(fsdf: FSDataFrame) -> CrossValidatorModel:
return cv_model


@tag("spark implementation")
def cv_fm_regression(fsdf: FSDataFrame) -> CrossValidatorModel:
"""
Cross-validation with Factorization Machines as estimator.
Expand All @@ -156,7 +159,6 @@ def cv_fm_regression(fsdf: FSDataFrame) -> CrossValidatorModel:
:param fsdf: FSDataFrame

:return: CrossValidatorModel
TODO: Do it make sense here to return the full CV Model??
"""

features_col = "features"
Expand Down Expand Up @@ -184,12 +186,14 @@ def cv_fm_regression(fsdf: FSDataFrame) -> CrossValidatorModel:

def get_accuracy(model: CrossValidatorModel) -> float:
"""
Get accuracy from a trained CrossValidatorModel (best model).
# TODO: This function should be able to parse all available models.
Currently only support RandomForestClassificationModel.

:param model: Trained CrossValidatorModel
:return: Training accuracy
:return: accuracy
"""

best_model = model.bestModel
if isinstance(best_model, RandomForestClassificationModel):
acc = best_model.summary.accuracy
Expand All @@ -200,7 +204,7 @@ def get_accuracy(model: CrossValidatorModel) -> float:
return acc


def get_predictions(model: CrossValidatorModel) -> pyspark.sql.DataFrame:
def get_predictions(model: CrossValidatorModel) -> pyspark.pandas.DataFrame:
"""
# TODO: This function should be able to parse all available models.
Currently only support RandomForestClassificationModel.
Expand All @@ -219,11 +223,11 @@ def get_predictions(model: CrossValidatorModel) -> pyspark.sql.DataFrame:
)
else:
pred = None
return pred
return pred.pandas_api()


def get_feature_scores(model: CrossValidatorModel,
indexed_features: pyspark.pandas.series.Series = None) -> pd.DataFrame:
indexed_features: pyspark.pandas.series.Series = None) -> pyspark.pandas.DataFrame:
"""
Extract features scores (e.g. importance or coefficients) from a trained CrossValidatorModel.

Expand All @@ -234,7 +238,7 @@ def get_feature_scores(model: CrossValidatorModel,
:param indexed_features: If provided, report features names rather than features indices.
Usually, the output from `training_data.get_features_indexed()`.

:return: Pandas on DataFrame with feature importance
:return: Pandas DataFrame with feature importance
"""

df_features = (None if indexed_features is None
Expand Down Expand Up @@ -279,5 +283,5 @@ def get_feature_scores(model: CrossValidatorModel,
return df.sort_values(by="coefficients", ascending=False)

else:
df = None # this should follow with parsing options for the different models.
return df
# TODO: here we should support other models.
pass
14 changes: 11 additions & 3 deletions fsspark/fs/multivariate.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,31 @@

from fsspark.fs.core import FSDataFrame
from fsspark.fs.utils import find_maximal_independent_set
from fsspark.utils.generic import tag

logging.basicConfig(format="%(levelname)s (%(name)s %(lineno)s): %(message)s")
logger = logging.getLogger("FSSPARK:MULTIVARIATE")
logger.setLevel(logging.INFO)


@tag("experimental")
def _compute_correlation_matrix(sdf: pyspark.sql.DataFrame,
features_col: str = 'features',
method: str = "pearson") -> np.ndarray:
"""
Compute features Matrix Correlation.
TODO: Warning: Computed matrix correlation will collected into the drive with this implementation.

:param sdf: Spark DataFrame
:param features_col: Name of the feature column vector name.
:param method: One of `pearson` (default) or `spearman`.

:return: Numpy array.
"""

logger.warning("Warning: Computed matrix correlation will be collected into the drive with this implementation.\n"
"This may cause memory issues. Use it preferably with small datasets.")
logger.info(f"Computing correlation matrix using {method} method.")

mcorr = (Correlation
.corr(sdf, features_col, method)
.collect()[0][0]
Expand All @@ -35,6 +41,7 @@ def _compute_correlation_matrix(sdf: pyspark.sql.DataFrame,
return mcorr


@tag("experimental")
def multivariate_correlation_selector(fsdf: FSDataFrame,
strict: bool = True,
corr_threshold: float = 0.75,
Expand All @@ -43,9 +50,9 @@ def multivariate_correlation_selector(fsdf: FSDataFrame,
Compute the correlation matrix (Pearson) among input features and select those below a specified threshold.

:param fsdf: Input FSDataFrame
:param strict: If True (default), apply hard filtering (strict) to remove highly related features.
:param strict: If True (default), apply hard filtering (strict) to remove highly correlated features.
Otherwise, find the maximal independent set of highly correlated features (approximate method).
The approximate method is experimental.
`Warning`: The approximate method is experimental.
:param corr_threshold: Minimal correlation threshold to consider two features correlated.
:param method: One of `pearson` (default) or `spearman`.

Expand Down Expand Up @@ -84,6 +91,7 @@ def multivariate_correlation_selector(fsdf: FSDataFrame,
return selected_features


@tag("spark implementation")
def multivariate_variance_selector(fsdf: FSDataFrame,
variance_threshold: float = 0.0) -> List[str]:
"""
Expand Down
15 changes: 10 additions & 5 deletions fsspark/fs/univariate.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,17 @@
from pyspark.ml.feature import UnivariateFeatureSelector

from fsspark.fs.core import FSDataFrame
from fsspark.utils.generic import tag

logging.basicConfig(format="%(levelname)s (%(name)s %(lineno)s): %(message)s")
logger = logging.getLogger("FSSPARK:UNIVARIATE")
logger.setLevel(logging.INFO)


@tag("spark implementation")
def compute_univariate_corr(fsdf: FSDataFrame) -> Dict[str, float]:
"""
Compute the correlation coefficient between every column (features) in the input DataFrame and a defined target.
Compute the correlation coefficient between every column (features) in the input DataFrame and the label (class).

:param fsdf: Input FSDataFrame

Expand All @@ -32,7 +34,7 @@ def compute_univariate_corr(fsdf: FSDataFrame) -> Dict[str, float]:
def univariate_correlation_selector(fsdf: FSDataFrame,
corr_threshold: float = 0.3) -> List[str]:
"""
Select features based on its correlation with a target label, if corr value is less than a specified threshold.
Select features based on its correlation with a label (class), if corr value is less than a specified threshold.
Expected both features and label to be of type numeric.

:param fsdf: FSDataFrame
Expand All @@ -46,6 +48,7 @@ def univariate_correlation_selector(fsdf: FSDataFrame,
return selected_features


@tag("spark implementation")
def univariate_selector(fsdf: FSDataFrame,
label_type: str = 'categorical',
**kwargs) -> List[str]:
Expand All @@ -63,7 +66,7 @@ def univariate_selector(fsdf: FSDataFrame,
"""

vector_col_name = 'features'
vsdf = fsdf.get_sdf_vector(output_column_vector=vector_col_name)
sdf = fsdf.get_sdf_vector(output_column_vector=vector_col_name)
label = fsdf.get_label_col_name()

# set selector
Expand All @@ -85,13 +88,14 @@ def univariate_selector(fsdf: FSDataFrame,
.setLabelCol(label)
)

model = selector.fit(vsdf)
model = selector.fit(sdf)
selected_features_indices = model.selectedFeatures
selected_features = fsdf.get_features_by_index(selected_features_indices)

return selected_features


@tag("spark implementation")
def univariate_filter(fsdf: FSDataFrame,
univariate_method: str = 'u_corr',
**kwargs) -> FSDataFrame:
Expand All @@ -100,7 +104,8 @@ def univariate_filter(fsdf: FSDataFrame,

:param fsdf: Input FSDataFrame
:param univariate_method: Univariate selector method.
Possible values are 'u_corr', 'anova' or 'f_regression'.
Possible values are 'u_corr', 'anova' (categorical label)
or 'f_regression' (continuous label).

:return: Filtered FSDataFrame
"""
Expand Down
Loading
Loading