Skip to content

Commit

Permalink
first iteration of pandas fdataframe.py
Browse files Browse the repository at this point in the history
  • Loading branch information
ypriverol committed Sep 19, 2024
1 parent deb2df6 commit 70fec44
Showing 1 changed file with 47 additions and 23 deletions.
70 changes: 47 additions & 23 deletions fsspark/fs/fdataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@
class FSDataFrame:
"""
FSDataFrame is a representation of a DataFrame with some functionalities to perform feature selection.
An object from FSDataFrame is basically represented by a DataFrame with samples
An object from FSDataFrame is basically represented by a DataFrame with samples
as rows and features as columns, with extra distributed indexed pandas series for
features names and samples labels.
An object of FSDataFrame offers an interface to a DataFrame, a Pandas on DataFrame
(e.g. suitable for visualization) or a DataFrame with features as a Dense column vector (e.g. suitable for
applying most algorithms from MLib API).
applying most algorithms from MLib API).
It can also be split in training and testing dataset and filtered by removing selected features (by name or index).
Expand Down Expand Up @@ -80,7 +80,6 @@ def __init__(
def _check_df(self):
"""
Check if input DataFrame meet the minimal requirements to feed an FS pipeline.
:return: None
"""
col_names = self.__df.columns
Expand All @@ -96,10 +95,8 @@ def _check_df(self):
def _set_indexed_cols(self) -> Series:
"""
Create a distributed indexed Series representing features.
:return: Pandas on (PoS) Series
"""
# TODO: Check for equivalent to pandas distributed Series in .
non_features_cols = [self.__sample_col, self.__label_col, self.__row_index_name]
features = [f for f in self.__df.columns if f not in non_features_cols]
return Series(features)
Expand Down Expand Up @@ -362,18 +359,45 @@ def split_df(self,
label_type_cat: bool = True,
split_training_factor: float = 0.7) -> Tuple['FSDataFrame', 'FSDataFrame']:
"""
TODO: Split dataframe in training and test dataset, maintaining balance between classes.
Split DataFrame into training and test dataset.
It will generate a nearly class-balanced training
and testing set for both categorical and continuous label input.
:param label_type_cat: If True (the default), the input label colum will be processed as categorical.
:param label_type_cat: If True (the default), the input label column will be processed as categorical.
Otherwise, it will be considered a continuous variable and binarized.
:param split_training_factor: Proportion of the training set. Usually, a value between 0.6 and 0.8.
:return: Tuple of FSDataFrames. First element is the training set and second element is the testing set.
"""

label_col = self.get_label_col_name()
df = self.__df.copy()

# Create a temporary label column for sampling
tmp_label_col = '_tmp_label_indexed'

if label_type_cat:
# Use factorize to convert categorical labels to integer indices
df[tmp_label_col], _ = pd.factorize(df[label_col])
else:
# For continuous labels, create a uniform random column and binarize it
df['_tmp_uniform_rand'] = np.random.rand(len(df))
df[tmp_label_col] = (df['_tmp_uniform_rand'] > 0.5).astype(int)
df = df.drop(columns=['_tmp_uniform_rand'])

# Perform stratified sampling to get class-balanced training set
train_df = df.groupby(tmp_label_col, group_keys=False).apply(lambda x: x.sample(frac=split_training_factor))

# Get the test set by subtracting the training set from the original DataFrame
test_df = df.drop(train_df.index)

# Drop the temporary label column
train_df = train_df.drop(columns=[tmp_label_col])
test_df = test_df.drop(columns=[tmp_label_col])

# Return the updated DataFrames
return self.update(train_df), self.update(test_df)




Expand All @@ -395,29 +419,29 @@ def update(cls,
"""
return cls(df, sample_col, label_col, row_index_col)

def _assemble_column_vector(self,
def _assemble_column_vector(self,
input_feature_cols: List[str],
output_column_vector: str = 'features',
drop_input_cols: bool = True) -> pd.DataFrame:
"""
Assemble features (columns) from DataFrame into a column of type Numpy array.
"""
Assemble features (columns) from DataFrame into a column of type Numpy array.
:param drop_input_cols: Boolean flag to drop the input feature columns.
:param input_feature_cols: List of feature column names.
:param output_column_vector: Name of the output column that will contain the combined vector.
:param sdf: Pandas DataFrame
:param drop_input_cols: Boolean flag to drop the input feature columns.
:param input_feature_cols: List of feature column names.
:param output_column_vector: Name of the output column that will contain the combined vector.
:param sdf: Pandas DataFrame
:return: DataFrame with column of type Numpy array.
"""
:return: DataFrame with column of type Numpy array.
"""

# Combine the input columns into a single vector (Numpy array)
self.__df[output_column_vector] = self.__df[input_feature_cols].apply(lambda row: np.array(row), axis=1)
# Combine the input columns into a single vector (Numpy array)
self.__df[output_column_vector] = self.__df[input_feature_cols].apply(lambda row: np.array(row), axis=1)

# Drop input columns if flag is set to True
if drop_input_cols:
return self.__df.drop(columns=input_feature_cols)
else:
return self.__df
# Drop input columns if flag is set to True
if drop_input_cols:
return self.__df.drop(columns=input_feature_cols)
else:
return self.__df

def _disassemble_column_vector(self,
features_cols: List[str],
Expand Down

0 comments on commit 70fec44

Please sign in to comment.