diff --git a/environment.yml b/environment.yml index 9f48f63..7d4be58 100644 --- a/environment.yml +++ b/environment.yml @@ -10,5 +10,4 @@ dependencies: - pyspark~=3.3.0 - networkx~=2.8.7 - numpy~=1.23.4 - - pandas~=1.5.1 - pyarrow~=8.0.0 diff --git a/fsspark/fs/fdataframe.py b/fsspark/fs/fdataframe.py index 53e8327..21cc06c 100644 --- a/fsspark/fs/fdataframe.py +++ b/fsspark/fs/fdataframe.py @@ -51,27 +51,25 @@ def __init__( :param parse_features: Coerce all features to float. """ - self.__df = df self.__sample_col = sample_col self.__label_col = label_col - self.__row_index_name = row_index_col + self.__row_index_col = row_index_col + self.__df = df # check input dataframe self._check_df() # replace dots in column names, if any. if parse_col_names: - # TODO: Dots in column names are prone to errors, since dots are used to access attributes from DataFrame. - # Should we make this replacement optional? Or print out a warning? self.__df = self.__df.toDF(*(c.replace('.', '_') for c in self.__df.columns)) # If the specified row index column name does not exist, add row index to the dataframe - if self.__row_index_name not in self.__df.columns: - self.__df = self._add_row_index(index_name=self.__row_index_name) + if self.__row_index_col not in self.__df.columns: + self.__df = self._add_row_index(index_name=self.__row_index_col) if parse_features: # coerce all features to float - non_features_cols = [self.__sample_col, self.__label_col, self.__row_index_name] + non_features_cols = [self.__sample_col, self.__label_col, self.__row_index_col] feature_cols = [c for c in self.__df.columns if c not in non_features_cols] self.__df = self.__df.withColumns({c: self.__df[c].cast('float') for c in feature_cols}) @@ -88,7 +86,7 @@ def _check_df(self): raise ValueError(f"Column sample name {self.__sample_col} not found...") elif self.__label_col not in col_names: raise ValueError(f"Column label name {self.__label_col} not found...") - elif not isinstance(self.__row_index_name, str): + elif not isinstance(self.__row_index_col, str): raise ValueError("Row index column name must be a valid string...") else: pass @@ -98,21 +96,24 @@ def _set_indexed_cols(self) -> Series: Create a distributed indexed Series representing features. :return: Pandas on (PoS) Series """ - non_features_cols = [self.__sample_col, self.__label_col, self.__row_index_name] + non_features_cols = [self.__sample_col, self.__label_col, self.__row_index_col] features = [f for f in self.__df.columns if f not in non_features_cols] return Series(features) - def _set_indexed_rows(self) -> Series: + def _set_indexed_rows(self) -> pd.Series: """ - Create a distributed indexed Series representing samples labels. - It will use existing row indices, if any. + Create an indexed Series representing sample labels. + It will use existing row indices from the DataFrame. :return: Pandas Series """ - label = self.__df[self.__label_col] - row_index = self.__df[self.__row_index_name] - return pd.Series(data=label.values, index=row_index.values) + # Extract the label and row index columns from the DataFrame + labels = self.__df[self.__label_col] + row_indices = self.__df[self.__row_index_col] + + # Create a Pandas Series with row_indices as index and labels as values + return pd.Series(data=labels.values, index=row_indices.values) def get_features_indexed(self) -> Series: """ @@ -224,7 +225,7 @@ def get_row_index_name(self) -> str: :return: Row id column name. """ - return self.__row_index_name + return self.__row_index_col def _add_row_index(self, index_name: str = '_row_index') -> pd.DataFrame: """ @@ -277,12 +278,12 @@ def filter_features(self, features: List[str], keep: bool = True) -> 'FSDataFram sdf = sdf.select( self.__sample_col, self.__label_col, - self.__row_index_name, + self.__row_index_col, *features) else: sdf = sdf.drop(*features) - fsdf_filtered = self.update(sdf, self.__sample_col, self.__label_col, self.__row_index_name) + fsdf_filtered = self.update(sdf, self.__sample_col, self.__label_col, self.__row_index_col) count_b = fsdf_filtered.count_features() logger.info(f"{count_b} features out of {count_a} remain after applying this filter...") diff --git a/fsspark/tests/test_FSDataFrame.py b/fsspark/tests/test_FSDataFrame.py deleted file mode 100644 index 2376b99..0000000 --- a/fsspark/tests/test_FSDataFrame.py +++ /dev/null @@ -1,79 +0,0 @@ -import unittest - -from fsspark.config.context import init_spark, stop_spark_session -from fsspark.fs.core import FSDataFrame -from fsspark.utils.datasets import get_tnbc_data_path -from fsspark.utils.io import import_table_as_psdf - - -class FSDataFrameTest(unittest.TestCase): - """ - Define testing methods for FSDataFrame class. - """ - - def setUp(self) -> None: - init_spark(apply_pyarrow_settings=True, - apply_extra_spark_settings=True, - apply_pandas_settings=True) - - def tearDown(self) -> None: - stop_spark_session() - - @staticmethod - def import_FSDataFrame(): - df = import_table_as_psdf(get_tnbc_data_path(), n_partitions=5) - fsdf = FSDataFrame(df, sample_col='Sample', label_col='label') - return fsdf - - def test_FSDataFrame(self): - """ - Test FSDataFrame class. - :return: None - """ - - # create object of type FSDataFrame - fsdf = self.import_FSDataFrame() - - self.assertEqual(len(fsdf.get_features_names()), 500) - self.assertEqual(len(fsdf.get_sample_label()), 44) - - def test_get_sdf_vector(self): - """ - Test get_sdf_vector method. - :return: None - """ - - fsdf = self.import_FSDataFrame() - - sdf = fsdf.get_sdf_vector(output_column_vector='features') - sdf.show(5) - self.assertEqual(len(sdf.columns), 4) - - def test_scale_features(self): - """ - Test scale_features method. - :return: None - """ - - fsdf = self.import_FSDataFrame() - fsdf = fsdf.scale_features(scaler_method='min_max') - - fsdf.get_sdf().show(10) - self.assertGreaterEqual(min(fsdf.to_psdf()['tr|E9PBJ4'].to_numpy()), 0.0) - self.assertLessEqual(max(fsdf.to_psdf()['tr|E9PBJ4'].to_numpy()), 1.0) - - def test_get_features_indices(self): - """ - Test get_features_indices method. - :return: None - """ - - fsdf = self.import_FSDataFrame() - feature_indices = fsdf.get_features_indexed() - feature_names = feature_indices.loc[[0, 1, 2, 5]].tolist() - - self.assertTrue(all([x in ['tr|E9PBJ4', 'sp|P07437', 'sp|P68371', 'tr|F8VWX4'] for x in feature_names])) - - -if __name__ == '__main__': - unittest.main() diff --git a/fsspark/tests/test_fsdataframe.py b/fsspark/tests/test_fsdataframe.py new file mode 100644 index 0000000..09fc2ac --- /dev/null +++ b/fsspark/tests/test_fsdataframe.py @@ -0,0 +1,27 @@ +import pytest +import pandas as pd +from fsspark.fs.fdataframe import FSDataFrame + +def test_initializes_fsdataframe(): + + # Create a sample DataFrame + data = { + 'sample_id': [1, 2, 3], + 'label': ['A', 'B', 'C'], + 'feature1': [0.1, 0.2, 0.3], + 'feature2': [1.1, 1.2, 1.3] + } + df = pd.DataFrame(data) + + # Initialize FSDataFrame + fs_df = FSDataFrame( + df=df, + sample_col='sample_id', + label_col='label', + row_index_col='_row_index', + parse_col_names=False, + parse_features=False + ) + + # Assertions to check if the initialization is correct + assert (fs_df.get_sdf(), df) \ No newline at end of file