Skip to content

Commit

Permalink
fixed fs methods
Browse files Browse the repository at this point in the history
  • Loading branch information
enriquea committed Apr 24, 2024
1 parent 6040a0d commit 75fe07b
Showing 1 changed file with 147 additions and 42 deletions.
189 changes: 147 additions & 42 deletions fsspark/fs/methods.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from abc import ABC, abstractmethod
from typing import List, Type, Union, Tuple
from typing import List, Type, Union, Tuple, Optional, Dict, Any

from fsspark.fs.constants import (ML_METHODS, UNIVARIATE_METHODS,
MULTIVARIATE_METHODS)
Expand Down Expand Up @@ -252,6 +252,7 @@ def __init__(self,

# set the estimator, grid and cv parameters (or none if not provided)
self.estimator_params = kwargs.get('estimator_params', None) # estimator parameters
self.evaluator_params = kwargs.get('evaluator_params', None) # evaluator parameters
self.grid_params = kwargs.get('grid_params', None) # grid parameters
self.cv_params = kwargs.get('cv_params', None) # cross-validation parameters

Expand All @@ -263,6 +264,14 @@ def __init__(self,
self.percent_to_keep = percent_to_keep
self.rfe_iterations = rfe_iterations

# performance metrics
self.rfe_training_metric: list = [] # performance metrics on training for each rfe iteration
self.training_metric = None # performance metrics on training (final model)
self.testing_metric = None # performance metrics on testing (final model)

# feature importance
self.feature_scores = None

def validate_method(self, fs_method: str):
"""
Validate the machine learning method.
Expand Down Expand Up @@ -299,12 +308,29 @@ def _set_ml_model(self):
self._ml_model = MLCVModel.create_model(
model_type=model_type,
estimator_params=self.estimator_params,
evaluator_params=self.evaluator_params,
grid_params=self.grid_params,
cv_params=self.cv_params
)

return self._ml_model

def _fit_and_filter(self, df: FSDataFrame) -> FSDataFrame:

# fit the current machine learning model
self._ml_model.fit(df)

# get feature scores
feature_scores = self._ml_model.get_feature_scores()

# get feature based on the (percentile) threshold provided
# expected a dataframe sorted by scores in descending order
selected_features = feature_scores.iloc[
:int(self.percent_to_keep * len(feature_scores))
]['feature_index']

return df.filter_features_by_index(selected_features, keep=True)

def select_features(self, fsdf: FSDataFrame) -> FSDataFrame:
"""
Select features using the specified machine learning method.
Expand All @@ -319,41 +345,91 @@ def select_features(self, fsdf: FSDataFrame) -> FSDataFrame:
if fsdf is None or fsdf.count_features() == 0 or fsdf.count_instances() == 0:
raise ValueError("The data frame is empty or does not contain any features.")

# ml_model = self._ml_model
performance_metrics = []
fsdf = self._fit_and_filter(fsdf)

def _fit_and_filter(df: FSDataFrame) -> FSDataFrame:
# Recursive feature elimination
if self.rfe:
for iteration in range(self.rfe_iterations):
print(f"RFE: running {iteration + 1} of {self.rfe_iterations} iterations...")
fsdf = self._fit_and_filter(fsdf)
# collect the performance metrics on training for every rfe iteration
self.rfe_training_metric.append(self._ml_model.get_eval_metric_on_training())

# fit the model
self._ml_model.fit(df)
# get the final performance metric on training
self.training_metric = self._ml_model.get_eval_metric_on_training()

# get feature scores
feature_scores = self._ml_model.get_feature_scores()
# get the feature scores after feature selection
self.feature_scores = self._ml_model.get_feature_scores()

# get feature based on the (percentile) threshold provided
# expected a dataframe sorted by scores in descending order
selected_features = feature_scores.iloc[
:int(self.percent_to_keep * len(feature_scores))
]['feature_index']
return fsdf

# get accuracy of the model
accuracy = self._ml_model.get_accuracy()
performance_metrics.append(accuracy)
return df.filter_features_by_index(selected_features, keep=True)
def get_eval_metric_name(self):
"""
Get the evaluation metric name.
# Recursive feature elimination
if self.rfe:
for iteration in range(self.rfe_iterations):
print(f"RFE: running {iteration + 1} of {self.rfe_iterations} iterations...")
fsdf = _fit_and_filter(fsdf)
# print the performance metrics
print(f"Performance metrics (accuracy): {performance_metrics}")
return fsdf
else:
fsdf = _fit_and_filter(fsdf)
# print the performance metrics
print(f"Performance metrics (accuracy): {performance_metrics}")
return fsdf
Returns:
The evaluation metric name.
"""

if self._ml_model is None:
raise ValueError("No machine learning model is available.")

return self._ml_model.get_eval_metric_name()

def get_eval_metric_on_training_rfe(self):
"""
Get the evaluation metric on the training data for each RFE iteration.
Returns:
The evaluation metric on the training data for each RFE iteration.
"""
if self.rfe_training_metric is None:
raise ValueError("No training metric is available. Run the select_features method first.")
return self.rfe_training_metric

def get_eval_metric_on_training(self):
"""
Get the evaluation metric on the training data.
Returns:
The evaluation metric on the training data.
"""
if self.training_metric is None:
raise ValueError("No training metric is available. Run the select_features method first.")
return self.training_metric

def get_eval_metric_on_testing(self, fsdf: FSDataFrame):
"""
Evaluate the machine learning method on the testing data.
Parameters:
fsdf: The testing data frame on which the machine learning method is to be evaluated.
Returns:
The evaluation metric on the testing data.
"""

if fsdf is None or fsdf.count_features() == 0 or fsdf.count_instances() == 0:
raise ValueError("The testing data frame is empty or does not contain any features.")

# evaluate the model on the testing data
eval_metric = self._ml_model.get_eval_metric_on_testing(fsdf)
self.testing_metric = eval_metric

return eval_metric

def get_feature_scores(self):
"""
Get the feature scores after feature selection.
Returns:
The feature scores as a pandas DataFrame.
"""

if self.feature_scores is None:
raise ValueError("Feature scores are not available. Run the feature selection method first.")

return self.feature_scores

def __str__(self):
return f"FSMLMethod(method={self.fs_method}, kwargs={self.kwargs})"
Expand All @@ -380,17 +456,26 @@ class FSPipeline:
FSMultivariate,
FSMLMethod]

def __init__(self, fs_stages: List[Union[FSUnivariate, FSMultivariate, FSMLMethod]]):
def __init__(self,
df_training: FSDataFrame,
df_testing: Optional[FSDataFrame],
fs_stages: List[Union[FSUnivariate, FSMultivariate, FSMLMethod]]):
"""
Initialize the feature selection pipeline with the specified feature selection methods.
Parameters:
df_training: The training data frame on which the feature selection pipeline is to be run.
df_testing: The testing data frame on which the ML wrapper method (if any) is to be evaluated.
fs_stages: A list of feature selection methods to be used in the pipeline.
"""

self.df_training = df_training
self.df_testing = df_testing
self.fs_stages = fs_stages
self.validate_methods()

self.pipeline_results = {}

def validate_methods(self):
"""
Validate the feature selection methods in the pipeline.
Expand All @@ -409,26 +494,46 @@ def validate_methods(self):
if len(ml_methods) > 1:
raise ValueError("Only one ML method is allowed in the pipeline.")

def run(self, fsdf: FSDataFrame):
def run(self) -> Dict[str, Any]:
"""
Run the feature selection pipeline on the specified data frame.
Parameters:
fsdf: The data frame on which the feature selection pipeline is to be run.
Run the feature selection pipeline.
Returns:
The selected features.
A dictionary with the results of the feature selection pipeline.
"""

# apply each feature selection method in the pipeline
# print the stage of the pipeline being executed
# apply each feature selection method in the pipeline sequentially
n_stages = len(self.fs_stages)
fsdf_tmp = fsdf
fsdf_tmp = self.df_training

self.pipeline_results.update(n_stages=n_stages)

for i, method in enumerate(self.fs_stages):
print(f"Running stage {i + 1} of {n_stages} of the feature selection pipeline: {method}")
fsdf_tmp = method.select_features(fsdf_tmp)
if isinstance(method, FSMLMethod):

fsdf_tmp = method.select_features(fsdf_tmp)

# collect the results during the feature selection process (rfe iterations, feature scores, etc.)
self.pipeline_results.update(rfe_iterations=method.rfe_iterations)
self.pipeline_results.update(feature_scores=method.get_feature_scores())
self.pipeline_results.update(eval_metric=method.get_eval_metric_name())
self.pipeline_results.update(rfe_training_metric=method.get_eval_metric_on_training_rfe())
self.pipeline_results.update(training_metric=method.get_eval_metric_on_training())

if self.df_testing is not None:

# evaluate the final model on the testing data (if available)
testing_metric = method.get_eval_metric_on_testing(self.df_testing)
self.pipeline_results.update(testing_metric=testing_metric)

else:
fsdf_tmp = method.select_features(fsdf_tmp)

self.pipeline_results.update(n_initial_features=self.df_training.count_features())
self.pipeline_results.update(n_selected_features=fsdf_tmp.count_features())

return fsdf_tmp
return self.pipeline_results

def __str__(self):
return f"FSPipeline(fs_methods={self.fs_stages})"
Expand Down

0 comments on commit 75fe07b

Please sign in to comment.