diff --git a/fsspark/fs/fdataframe.py b/fsspark/fs/fdataframe.py index b56d1b1..5a686de 100644 --- a/fsspark/fs/fdataframe.py +++ b/fsspark/fs/fdataframe.py @@ -13,13 +13,13 @@ class FSDataFrame: """ FSDataFrame is a representation of a DataFrame with some functionalities to perform feature selection. - An object from FSDataFrame is basically represented by a DataFrame with samples + An object from FSDataFrame is basically represented by a DataFrame with samples as rows and features as columns, with extra distributed indexed pandas series for features names and samples labels. An object of FSDataFrame offers an interface to a DataFrame, a Pandas on DataFrame (e.g. suitable for visualization) or a DataFrame with features as a Dense column vector (e.g. suitable for - applying most algorithms from MLib API). + applying most algorithms from MLib API). It can also be split in training and testing dataset and filtered by removing selected features (by name or index). @@ -80,7 +80,6 @@ def __init__( def _check_df(self): """ Check if input DataFrame meet the minimal requirements to feed an FS pipeline. - :return: None """ col_names = self.__df.columns @@ -96,10 +95,8 @@ def _check_df(self): def _set_indexed_cols(self) -> Series: """ Create a distributed indexed Series representing features. - :return: Pandas on (PoS) Series """ - # TODO: Check for equivalent to pandas distributed Series in . non_features_cols = [self.__sample_col, self.__label_col, self.__row_index_name] features = [f for f in self.__df.columns if f not in non_features_cols] return Series(features) @@ -362,18 +359,45 @@ def split_df(self, label_type_cat: bool = True, split_training_factor: float = 0.7) -> Tuple['FSDataFrame', 'FSDataFrame']: """ - TODO: Split dataframe in training and test dataset, maintaining balance between classes. Split DataFrame into training and test dataset. It will generate a nearly class-balanced training and testing set for both categorical and continuous label input. - :param label_type_cat: If True (the default), the input label colum will be processed as categorical. + :param label_type_cat: If True (the default), the input label column will be processed as categorical. Otherwise, it will be considered a continuous variable and binarized. :param split_training_factor: Proportion of the training set. Usually, a value between 0.6 and 0.8. :return: Tuple of FSDataFrames. First element is the training set and second element is the testing set. """ + label_col = self.get_label_col_name() + df = self.__df.copy() + + # Create a temporary label column for sampling + tmp_label_col = '_tmp_label_indexed' + + if label_type_cat: + # Use factorize to convert categorical labels to integer indices + df[tmp_label_col], _ = pd.factorize(df[label_col]) + else: + # For continuous labels, create a uniform random column and binarize it + df['_tmp_uniform_rand'] = np.random.rand(len(df)) + df[tmp_label_col] = (df['_tmp_uniform_rand'] > 0.5).astype(int) + df = df.drop(columns=['_tmp_uniform_rand']) + + # Perform stratified sampling to get class-balanced training set + train_df = df.groupby(tmp_label_col, group_keys=False).apply(lambda x: x.sample(frac=split_training_factor)) + + # Get the test set by subtracting the training set from the original DataFrame + test_df = df.drop(train_df.index) + + # Drop the temporary label column + train_df = train_df.drop(columns=[tmp_label_col]) + test_df = test_df.drop(columns=[tmp_label_col]) + + # Return the updated DataFrames + return self.update(train_df), self.update(test_df) + @@ -395,29 +419,29 @@ def update(cls, """ return cls(df, sample_col, label_col, row_index_col) - def _assemble_column_vector(self, +def _assemble_column_vector(self, input_feature_cols: List[str], output_column_vector: str = 'features', drop_input_cols: bool = True) -> pd.DataFrame: - """ - Assemble features (columns) from DataFrame into a column of type Numpy array. + """ + Assemble features (columns) from DataFrame into a column of type Numpy array. - :param drop_input_cols: Boolean flag to drop the input feature columns. - :param input_feature_cols: List of feature column names. - :param output_column_vector: Name of the output column that will contain the combined vector. - :param sdf: Pandas DataFrame + :param drop_input_cols: Boolean flag to drop the input feature columns. + :param input_feature_cols: List of feature column names. + :param output_column_vector: Name of the output column that will contain the combined vector. + :param sdf: Pandas DataFrame - :return: DataFrame with column of type Numpy array. - """ + :return: DataFrame with column of type Numpy array. + """ - # Combine the input columns into a single vector (Numpy array) - self.__df[output_column_vector] = self.__df[input_feature_cols].apply(lambda row: np.array(row), axis=1) + # Combine the input columns into a single vector (Numpy array) + self.__df[output_column_vector] = self.__df[input_feature_cols].apply(lambda row: np.array(row), axis=1) - # Drop input columns if flag is set to True - if drop_input_cols: - return self.__df.drop(columns=input_feature_cols) - else: - return self.__df + # Drop input columns if flag is set to True + if drop_input_cols: + return self.__df.drop(columns=input_feature_cols) + else: + return self.__df def _disassemble_column_vector(self, features_cols: List[str],