Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/refactor-py' into refactor-py
Browse files Browse the repository at this point in the history
# Conflicts:
#	requirements.txt
  • Loading branch information
ypriverol committed Sep 20, 2024
2 parents 471dafa + 30e0659 commit 66d6118
Show file tree
Hide file tree
Showing 2 changed files with 144 additions and 157 deletions.
148 changes: 79 additions & 69 deletions fsspark/fs/fdataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
logger = logging.getLogger("pickfeat")
logger.setLevel(logging.INFO)


class FSDataFrame:
"""
FSDataFrame is a representation of a DataFrame with some functionalities to perform feature selection.
Expand Down Expand Up @@ -188,7 +189,7 @@ def _collect_features_as_array(self) -> np.array:
:return: Numpy array
"""
sdf = self.get_sdf().select(*self.get_features_names())
sdf = self.get_df().select(*self.get_features_names())
a = np.array(sdf.collect())
return a

Expand All @@ -199,8 +200,8 @@ def to_psdf(self) -> DataFrame:
"""
return self.__df.pandas_api()

def get_sdf(self) -> DataFrame:
return self.__df
def get_df(self) -> DataFrame:
return self.__df

def get_sample_col_name(self) -> str:
"""
Expand Down Expand Up @@ -235,7 +236,7 @@ def _add_row_index(self, index_name: str = '_row_index') -> pd.DataFrame:
:return: DataFrame with extra column of row indices.
"""
# Add a new column with unique row indices using a range
self.__df[index_name] = range(len(self.__df))
self.__df[index_name] = list(range(len(self.__df)))
return self.__df

def count_features(self) -> int:
Expand Down Expand Up @@ -271,7 +272,7 @@ def filter_features(self, features: List[str], keep: bool = True) -> 'FSDataFram
return self

count_a = self.count_features()
sdf = self.get_sdf()
sdf = self.get_df()

if keep:
sdf = sdf.select(
Expand Down Expand Up @@ -333,23 +334,14 @@ def scale_features(self, scaler_method: str = 'standard', **kwargs) -> 'FSDataFr
else:
raise ValueError("`scaler_method` must be one of: min_max, max_abs, standard or robust.")

features_col_vector = '_features'
scaled_features_vector = '_features_scaled'
feature_array = self._features_to_array()

sdf = self.get_sdf_vector(output_column_vector=features_col_vector)
feature_array = (scaler
.fit(feature_array)
.transform()
)

sdf = (scaler
.setInputCol(features_col_vector)
.setOutputCol(scaled_features_vector)
.fit(sdf)
.transform(sdf)
.drop(features_col_vector)
)

sdf = _disassemble_column_vector(sdf,
features_cols=self.get_features_names(),
col_vector_name=scaled_features_vector,
drop_col_vector=True)
df_scaled = self._array_to_features(feature_array)

return self.update(sdf,
self.__sample_col,
Expand Down Expand Up @@ -399,9 +391,6 @@ def split_df(self,
# Return the updated DataFrames
return self.update(train_df), self.update(test_df)




@classmethod
def update(cls,
df: DataFrame,
Expand All @@ -420,51 +409,72 @@ def update(cls,
"""
return cls(df, sample_col, label_col, row_index_col)

def _assemble_column_vector(self,
input_feature_cols: List[str],
output_column_vector: str = 'features',
drop_input_cols: bool = True) -> pd.DataFrame:
"""
Assemble features (columns) from DataFrame into a column of type Numpy array.
:param drop_input_cols: Boolean flag to drop the input feature columns.
:param input_feature_cols: List of feature column names.
:param output_column_vector: Name of the output column that will contain the combined vector.
:param sdf: Pandas DataFrame
:return: DataFrame with column of type Numpy array.
"""

# Combine the input columns into a single vector (Numpy array)
self.__df[output_column_vector] = self.__df[input_feature_cols].apply(lambda row: np.array(row), axis=1)

# Drop input columns if flag is set to True
if drop_input_cols:
return self.__df.drop(columns=input_feature_cols)
else:
return self.__df

def _disassemble_column_vector(self,
features_cols: List[str],
col_vector_name: str,
drop_col_vector: bool = True) -> pd.DataFrame:
"""
Convert a column of Numpy arrays in DataFrame to individual columns (a.k.a features).
This is the reverse operation of `_assemble_column_vector`.
:param features_cols: List of new feature column names.
:param col_vector_name: Name of the column that contains the vector (Numpy array).
:param drop_col_vector: Boolean flag to drop the original vector column.
:return: DataFrame with individual feature columns.
"""

# Unpack the vector (Numpy array) into individual columns
for i, feature in enumerate(features_cols):
self.__df[feature] = self.__df[col_vector_name].apply(lambda x: x[i])

# Drop the original vector column if needed
if drop_col_vector:
self.__df = self.__df.drop(columns=[col_vector_name])
def _features_to_array(self) -> np.array:
"""
Collect features from FSDataFrame as an array.
`Warning`: This method will collect the entire DataFrame into the driver.
Uses this method on small datasets only (e.g., after filtering or splitting the data)
return self.__df
:return: Numpy array
"""
sdf = self.get_df().select(*self.get_features_names())
a = np.array(sdf.collect())
return a

def _array_to_features(self, a: np.array) -> pd.DataFrame:
"""
Convert a Numpy array to a DataFrame with features as columns.
:param a: Numpy array
:return: Pandas DataFrame
"""
return pd.DataFrame(a, columns=self.get_features_names())

#
# def _assemble_column_vector(self,
# input_feature_cols: List[str],
# output_column_vector: str = 'features',
# drop_input_cols: bool = True) -> pd.DataFrame:
# """
# Assemble features (columns) from DataFrame into a column of type Numpy array.
#
# :param drop_input_cols: Boolean flag to drop the input feature columns.
# :param input_feature_cols: List of feature column names.
# :param output_column_vector: Name of the output column that will contain the combined vector.
# :param sdf: Pandas DataFrame
#
# :return: DataFrame with column of type Numpy array.
# """
#
# # Combine the input columns into a single vector (Numpy array)
# self.__df[output_column_vector] = self.__df[input_feature_cols].apply(lambda row: np.array(row), axis=1)
#
# # Drop input columns if flag is set to True
# if drop_input_cols:
# return self.__df.drop(columns=input_feature_cols)
# else:
# return self.__df
#
#
# def _disassemble_column_vector(self,
# features_cols: List[str],
# col_vector_name: str,
# drop_col_vector: bool = True) -> pd.DataFrame:
# """
# Convert a column of Numpy arrays in DataFrame to individual columns (a.k.a features).
# This is the reverse operation of `_assemble_column_vector`.
#
# :param features_cols: List of new feature column names.
# :param col_vector_name: Name of the column that contains the vector (Numpy array).
# :param drop_col_vector: Boolean flag to drop the original vector column.
# :return: DataFrame with individual feature columns.
# """
#
# # Unpack the vector (Numpy array) into individual columns
# for i, feature in enumerate(features_cols):
# self.__df[feature] = self.__df[col_vector_name].apply(lambda x: x[i])
#
# # Drop the original vector column if needed
# if drop_col_vector:
# self.__df = self.__df.drop(columns=[col_vector_name])
#
# return self.__df
Loading

0 comments on commit 66d6118

Please sign in to comment.