Skip to content

Commit

Permalink
minor changes to store sparse matrices
Browse files Browse the repository at this point in the history
  • Loading branch information
ypriverol committed Sep 21, 2024
1 parent f75093d commit f15b4e8
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 36 deletions.
109 changes: 83 additions & 26 deletions fsspark/fs/fdataframe.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import logging
from typing import List, Tuple
from typing import List, Tuple, Optional

import numpy
import numpy as np
import pandas as pd
import psutil
from pandas import DataFrame
from scipy import sparse
from sklearn.preprocessing import MinMaxScaler, MaxAbsScaler, StandardScaler, RobustScaler, LabelEncoder

logging.basicConfig(format="%(levelname)s (%(name)s %(lineno)s): %(message)s")
Expand All @@ -28,45 +30,97 @@ class FSDataFrame:
[...]
"""

def __init__(
self,
df: DataFrame,
sample_col: str = None,
label_col: str = None,
df: pd.DataFrame,
sample_col: Optional[str] = None,
label_col: Optional[str] = None,
sparse_threshold: float = 0.7, # Threshold for sparsity
memory_threshold: Optional[float] = 0.75 # Proportion of system memory to use for dense arrays
):
"""
Create an instance of FSDataFrame.
Expected an input DataFrame with 2+N columns.
After specifying sample id and sample label columns, the remaining N columns will be considered as features.
The input DataFrame should contain 2+N columns. After specifying the sample id and label columns,
the remaining N columns will be considered features. The feature columns should contain only numerical data.
The DataFrame is stored in a dense or sparse format based on the sparsity of the data and available memory.
:param df: Pandas DataFrame
:param sample_col: Sample id column name
:param label_col: Sample label column name
:param df: Input Pandas DataFrame
:param sample_col: Column name for sample identifiers (optional)
:param label_col: Column name for labels (required)
:param sparse_threshold: Threshold for sparsity, default is 70%. If the proportion of zero entries
in the feature matrix exceeds this value, the matrix is stored in a sparse format unless memory allows.
:param memory_threshold: Proportion of system memory available to use before deciding on sparse/dense.
"""
self.__df = df.copy()

# Check for necessary columns
columns_to_drop = []

if sample_col is None:
# Handle sample column
if sample_col:
if sample_col not in df.columns:
raise ValueError(f"Sample column '{sample_col}' not found in DataFrame.")
self.__sample_col = sample_col
self.__samples = df[sample_col].tolist()
columns_to_drop.append(sample_col)
else:
self.__sample_col = None
self.__samples = []
logging.info("No sample column specified.")
else:
self.__sample_col = sample_col
self.__samples = df[sample_col].tolist()
df = df.drop(columns=[sample_col])

# Handle label column
if label_col is None:
raise ValueError("No label column specified. A class/label column is required.")
raise ValueError("A label column is required but was not specified.")
if label_col not in df.columns:
raise ValueError(f"Label column '{label_col}' not found in DataFrame.")

self.__label_col = label_col
self.__labels = df[label_col].tolist()

# Encode labels
label_encoder = LabelEncoder()
self.__labels_matrix = label_encoder.fit_transform(df[label_col]).tolist()
columns_to_drop.append(label_col)

# Drop both sample and label columns in one step
self.__df = self.__df.drop(columns=columns_to_drop)

# Extract features
self.__original_features = self.__df.columns.tolist()

# Ensure only numerical features are retained
numerical_df = self.__df.select_dtypes(include=[np.number])
if numerical_df.empty:
raise ValueError("No numerical features found in the DataFrame.")

# Check sparsity
num_elements = numerical_df.size
num_zeros = (numerical_df == 0).sum().sum()
sparsity = num_zeros / num_elements

dense_matrix_size = numerical_df.memory_usage(deep=True).sum() # In bytes
available_memory = psutil.virtual_memory().available # In bytes

if sparsity > sparse_threshold:
if dense_matrix_size < memory_threshold * available_memory:
# Use dense matrix if enough memory is available
logging.info(f"Data is sparse (sparsity={sparsity:.2f}) but enough memory available. "
f"Using a dense matrix.")
self.__matrix = numerical_df.to_numpy(dtype=np.float32)
self.__is_sparse = False
else:
# Use sparse matrix due to memory constraints
logging.info(f"Data is sparse (sparsity={sparsity:.2f}), memory insufficient for dense matrix. "
f"Using a sparse matrix representation.")
self.__matrix = sparse.csr_matrix(numerical_df.to_numpy(dtype=np.float32))
self.__is_sparse = True
else:
self.__label_col = label_col
self.__labels = df[label_col].tolist()
label_encoder = LabelEncoder()
self.__labels_matrix = label_encoder.fit_transform(df[label_col]).tolist()
df = df.drop(columns=[label_col])

self.__original_features = df.columns.tolist()
numerical_df = df.select_dtypes(include=[np.number])
self.__matrix = numerical_df.to_numpy(dtype=np.float32)
# Use dense matrix since it's not sparse
logging.info(f"Data is not sparse (sparsity={sparsity:.2f}), using a dense matrix.")
self.__matrix = numerical_df.to_numpy(dtype=np.float32)
self.__is_sparse = False

self.__is_scaled = (False, None)

def get_feature_matrix(self) -> numpy.array:
Expand Down Expand Up @@ -124,7 +178,7 @@ def scale_features(self, scaler_method: str = 'standard', **kwargs) -> bool:
else:
raise ValueError("`scaler_method` must be one of: min_max, max_abs, standard or robust.")

# TODO: Scale only the features for now, we have to investigate if we scale cateogrical variables
# TODO: Scale only the features for now, we have to investigate if we scale categorical variables
self.__matrix = scaler.fit_transform(self.__matrix)
self.__is_scaled = (True, scaler_method)
return True
Expand All @@ -135,6 +189,9 @@ def is_scaled(self):
def get_scaled_method(self):
return self.__is_scaled[1]

def is_sparse(self):
return self.__is_sparse

def select_features_by_index(self, feature_indexes: List[int]) -> 'FSDataFrame':
"""
Keep only the specified features (by index) and return an updated instance of FSDataFrame.
Expand Down
25 changes: 16 additions & 9 deletions fsspark/tests/test_fsdataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,41 +55,48 @@ def test_scaler_df():
assert fs_df.get_scaled_method() == 'standard'

def test_memory_fsdataframe():
def create_test_data(n_samples, n_features):
def create_test_data(n_samples: int, n_features: int, zero_prob: float = 0.1, nan_prob: float = 0.05):
"""Create test data for FSDataFrame."""
data = np.random.rand(n_samples, n_features)
data[np.random.rand(n_samples, n_features) < zero_prob] = 0
data[np.random.rand(n_samples, n_features) < nan_prob] = np.nan

df = pd.DataFrame(data, columns=[f'feature_{i}' for i in range(n_features)])
df['sample_id'] = [f'sample_{i}' for i in range(n_samples)]
df['label'] = np.random.choice(['A', 'B'], n_samples)
return df

def measure_memory_usage(n_samples, n_features):
def measure_memory_usage(n_samples: int, n_features: int, nan_prob = 0.01) -> float:
"""Measure memory usage of FSDataFrame for given number of samples and features."""
df = create_test_data(n_samples, n_features)
df = create_test_data(n_samples, n_features, nan_prob=nan_prob)
mem_usage = memory_usage((FSDataFrame, (df, 'sample_id', 'label')), max_iterations=1)[0]
gc.collect() # Force garbage collection to free memory
return mem_usage

# Define test cases
feature_sizes = [1000, 5000, 10000, 50000, 100_000, 1_000_000]
sample_sizes = [10, 50, 100, 500, 1000]
sample_sizes = [100, 500, 1000]
nan_prob = [0.05, 0.1, 0.2, 0.5]

# Measure memory usage for each test case
results = []
for n_samples in sample_sizes:
for n_features in feature_sizes:
mem_usage = measure_memory_usage(n_samples, n_features)
results.append((n_samples, n_features, mem_usage))
for prob in nan_prob:
mem_usage = measure_memory_usage(n_samples, n_features, nan_prob=prob)
results.append((n_samples, n_features, prob, mem_usage)) # Append prob to results

# Convert results to DataFrame
results_df = pd.DataFrame(results, columns=['Samples', 'Features', 'Memory (MB)'])
results_df = pd.DataFrame(results, columns=['Samples', 'Features', 'NAN Prob', 'Memory (MB)'])

# Create 2D line plot
plt.figure(figsize=(12, 8))

for feature_size in feature_sizes:
data = results_df[results_df['Features'] == feature_size]
plt.plot(data['Samples'], data['Memory (MB)'], marker='o', label=f'{feature_size} Features')
for prob in nan_prob:
data = results_df[(results_df['Features'] == feature_size) & (results_df['NAN Prob'] == prob)]
plt.plot(data['Samples'], data['Memory (MB)'], marker='o',
label=f'{feature_size} Features - {prob} NAN Prob')

plt.xlabel('Number of Samples')
plt.ylabel('Memory Usage (MB)')
Expand Down
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ networkx
numpy
setuptools
pandas
scikit-learn
scikit-learn
scipy

0 comments on commit f15b4e8

Please sign in to comment.