Skip to content

Commit

Permalink
update virgo generated dataset to use hdf5 format
Browse files Browse the repository at this point in the history
  • Loading branch information
jarlsondre committed Nov 6, 2024
1 parent efa59e8 commit 3f7282f
Show file tree
Hide file tree
Showing 3 changed files with 254 additions and 114 deletions.
232 changes: 154 additions & 78 deletions use-cases/virgo/data.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import os
from pathlib import Path
from typing import Optional, Tuple

import h5py
import pandas as pd
import torch
from sklearn.model_selection import train_test_split
Expand All @@ -16,11 +18,7 @@


class TimeSeriesDatasetGenerator(DataGetter):
def __init__(
self,
data_root: str = "data",
name: Optional[str] = None
) -> None:
def __init__(self, data_root: str = "data", name: Optional[str] = None) -> None:
"""Initialize the TimeSeriesDatasetGenerator class.
Args:
Expand All @@ -42,32 +40,102 @@ def execute(self) -> pd.DataFrame:
pd.DataFrame: dataset of Q-plot images.
"""
df_aux_ts = generate_dataset_aux_channels(
1000, 3, duration=16, sample_rate=500,
num_waves_range=(20, 25), noise_amplitude=0.6
1000,
3,
duration=16,
sample_rate=500,
num_waves_range=(20, 25),
noise_amplitude=0.6,
)
df_main_ts = generate_dataset_main_channel(
df_aux_ts, weights=None, noise_amplitude=0.1
)

# save datasets
save_name_main = 'TimeSeries_dataset_synthetic_main.pkl'
save_name_aux = 'TimeSeries_dataset_synthetic_aux.pkl'
save_name_main = "TimeSeries_dataset_synthetic_main.pkl"
save_name_aux = "TimeSeries_dataset_synthetic_aux.pkl"
df_main_ts.to_pickle(os.path.join(self.data_root, save_name_main))
df_aux_ts.to_pickle(os.path.join(self.data_root, save_name_aux))

# Transform to images and save to disk
df_ts = pd.concat([df_main_ts, df_aux_ts], axis=1)
df = generate_cut_image_dataset(
df_ts, list(df_ts.columns),
num_processes=20, square_size=64
df_ts, list(df_ts.columns), num_processes=20, square_size=64
)
save_name = 'Image_dataset_synthetic_64x64.pkl'
save_name = "Image_dataset_synthetic_64x64.pkl"
df.to_pickle(os.path.join(self.data_root, save_name))
return df


class SyntheticTimeSeriesDataset2(Dataset):
def __init__(
self, file: str, chunk_size: int = 500, hdf5_dataset_name: str = "virgo_dataset"
):
"""Initialize the DataFrameDataset class.
Args:
root_folder (str): Location of the pickled DataFrames.
"""
file_path = Path(file)
if not file_path.exists():
raise ValueError(
f"Given file location, {file_path.resolve()} does not exist. "
)
self.hdf5_dataset_name = hdf5_dataset_name
self.file_path = file_path
self.chunk_size = chunk_size
with h5py.File(self.file_path, "r") as f:
if self.hdf5_dataset_name not in f:
raise ValueError(
f"The dataset {self.hdf5_dataset_name} does not exist in "
f"the given HDF5 file."
)
dataset = f[self.hdf5_dataset_name]
self.num_datapoints = dataset.shape[0]

if self.num_datapoints == 0:
raise ValueError("The given file contains 0 data points!")

if self.num_datapoints % self.chunk_size != 0:
print(
f"[WARNING]: Number of datapoints, {self.num_datapoints} is not "
f"divisible by the given chunk size, {self.chunk_size}, so the "
f"remainder will be truncated. "
)
self.length = self.num_datapoints // self.chunk_size

def __len__(self):
"""Return the total number of files in the dataset."""
return self.length

def __getitem__(self, idx) -> torch.Tensor:
"""Retrieve a data sample by index and normalize.
Args:
idx (int): Index of the file to retrieve.
Returns:
torch.Tensor: Normalized tensor for specific idx
"""
if idx >= len(self):
raise ValueError(
f"Index {idx} out of bounds for dataset with length {len(self)}!"
)

offset = idx * self.chunk_size
with h5py.File(self.file_path, "r") as f:
dataset = f[self.hdf5_dataset_name]
data = dataset[offset : offset + self.chunk_size]

data = torch.tensor(data, dtype=torch.float32)
return normalize_(data)


class SyntheticTimeSeriesDataset(Dataset):
def __init__(self, root_folder: Optional[str] = None,):
def __init__(
self,
root_folder: Optional[str] = None,
):
"""Initialize the DataFrameDataset class.
Args:
Expand All @@ -79,16 +147,19 @@ def __init__(self, root_folder: Optional[str] = None,):
raise FileNotFoundError(f"Root folder '{root_folder}' not found.")

# Find all file paths in root folder
self.file_paths = [os.path.join(dirpath, f)
for dirpath, dirs, files in os.walk(root_folder)
for f in files if f.endswith('.pkl')]
self.file_paths = [
os.path.join(dirpath, f)
for dirpath, dirs, files in os.walk(root_folder)
for f in files
if f.endswith(".pkl")
]

def __len__(self):
"""Return the total number of files in the dataset."""
return len(self.file_paths)

def __getitem__(self, idx):
""" Retrieve a data sample by index, convert to tensor, and normalize.
"""Retrieve a data sample by index, convert to tensor, and normalize.
Args:
idx (int): Index of the file to retrieve.
Expand Down Expand Up @@ -118,15 +189,23 @@ def __getitem__(self, idx):

# Stack the main and auxiliary channels into 2D tensors
signal_data_train_2d = torch.stack(
[torch.stack(
[df_main_all_2d[main_channel].iloc[i]]) for i in range(df_main_all_2d.shape[0])])
[
torch.stack([df_main_all_2d[main_channel].iloc[i]])
for i in range(df_main_all_2d.shape[0])
]
)

aux_data_train_2d = torch.stack(
[torch.stack(
[df_aux_all_2d.iloc[i, 0],
df_aux_all_2d.iloc[i, 1],
df_aux_all_2d.iloc[i, 2]]
) for i in range(df_aux_all_2d.shape[0])]
[
torch.stack(
[
df_aux_all_2d.iloc[i, 0],
df_aux_all_2d.iloc[i, 1],
df_aux_all_2d.iloc[i, 2],
]
)
for i in range(df_aux_all_2d.shape[0])
]
)

# Concatenate the main and auxiliary channel tensors
Expand All @@ -144,7 +223,7 @@ def __init__(
test_proportion: int | float = 0.0,
root_folder: Optional[str] = None,
rnd_seed: Optional[int] = None,
name: Optional[str] = None
name: Optional[str] = None,
) -> None:
"""Initialize the splitter for time-series datasets.
Expand All @@ -156,12 +235,7 @@ def __init__(
root_folder (str | None): Folder containing the dataset files.
name (str | None): Name of the data splitter.
"""
super().__init__(
train_proportion,
validation_proportion,
test_proportion,
name
)
super().__init__(train_proportion, validation_proportion, test_proportion, name)
self.save_parameters(**self.locals2params(locals()))
self.rnd_seed = rnd_seed
self.root_folder = root_folder
Expand All @@ -183,10 +257,8 @@ def execute(self) -> Tuple[Dataset, Dataset, Dataset]:
generator = torch.Generator().manual_seed(self.rnd_seed)
[train_dataset, validation_dataset, test_dataset] = random_split(
whole_dataset,
[self.train_proportion,
self.validation_proportion,
self.test_proportion],
generator=generator
[self.train_proportion, self.validation_proportion, self.test_proportion],
generator=generator,
)
print(f"Shape of item: {train_dataset.__getitem__(idx=5).shape}")
return train_dataset, validation_dataset, test_dataset
Expand All @@ -200,26 +272,23 @@ def __init__(
test_proportion: int | float = 0.0,
rnd_seed: Optional[int] = None,
images_dataset: str = "data/Image_dataset_synthetic_64x64.pkl",
name: Optional[str] = None
name: Optional[str] = None,
) -> None:
"""Class for splitting of smaller datasets. Use this class in the pipeline if the
"""Class for splitting of smaller datasets. Use this class in the pipeline if the
entire dataset can fit into memory.
Args:
train_proportion (int | float): _description_
validation_proportion (int | float, optional): _description_. Defaults to 0.0.
test_proportion (int | float, optional): _description_. Defaults to 0.0.
rnd_seed (Optional[int], optional): _description_. Defaults to None.
images_dataset (str, optional): _description_.
images_dataset (str, optional): _description_.
Defaults to "data/Image_dataset_synthetic_64x64.pkl".
name (Optional[str], optional): _description_. Defaults to None.
"""
super().__init__(
train_proportion, validation_proportion,
test_proportion, name
)
super().__init__(train_proportion, validation_proportion, test_proportion, name)
self.save_parameters(**self.locals2params(locals()))
self.validation_proportion = 1-train_proportion
self.validation_proportion = 1 - train_proportion
self.rnd_seed = rnd_seed
self.images_dataset = images_dataset

Expand All @@ -231,10 +300,7 @@ def get_or_load(self, dataset: Optional[pd.DataFrame] = None):
return dataset

@monitor_exec
def execute(
self,
dataset: Optional[pd.DataFrame] = None
) -> Tuple:
def execute(self, dataset: Optional[pd.DataFrame] = None) -> Tuple:
"""Splits a dataset into train, validation and test splits.
Args:
Expand All @@ -257,8 +323,11 @@ def execute(
df_aux_all_2d = pd.DataFrame(df[aux_channels])
df_main_all_2d = pd.DataFrame(df[main_channel])
X_train_2d, X_test_2d, y_train_2d, y_test_2d = train_test_split(
df_aux_all_2d, df_main_all_2d,
test_size=self.validation_proportion, random_state=self.rnd_seed)
df_aux_all_2d,
df_main_all_2d,
test_size=self.validation_proportion,
random_state=self.rnd_seed,
)
return (X_train_2d, y_train_2d), (X_test_2d, y_test_2d)


Expand All @@ -274,9 +343,7 @@ def __init__(self, name: str | None = None) -> None:

@monitor_exec
def execute(
self,
train_dataset: Tuple,
validation_dataset: Tuple
self, train_dataset: Tuple, validation_dataset: Tuple
) -> Tuple[torch.Tensor, torch.Tensor, None]:
"""Pre-process datasets: rearrange and normalize before training.
Expand Down Expand Up @@ -309,20 +376,27 @@ def execute(
# ]) # for i in range(X_train.shape[0])

# whole dataset
signal_data_train_2d = torch.stack([
torch.stack([y_train_2d[main_channel].iloc[i]])
for i in range(y_train_2d.shape[0])
])
aux_data_train_2d = torch.stack([
torch.stack(
[X_train_2d.iloc[i][0], X_train_2d.iloc[i][1],
X_train_2d.iloc[i][2]])
for i in range(X_train_2d.shape[0])
])
signal_data_train_2d = torch.stack(
[
torch.stack([y_train_2d[main_channel].iloc[i]])
for i in range(y_train_2d.shape[0])
]
)
aux_data_train_2d = torch.stack(
[
torch.stack(
[
X_train_2d.iloc[i][0],
X_train_2d.iloc[i][1],
X_train_2d.iloc[i][2],
]
)
for i in range(X_train_2d.shape[0])
]
)

# concatenate torch.tensors
train_data_2d = torch.cat(
[signal_data_train_2d, aux_data_train_2d], dim=1)
train_data_2d = torch.cat([signal_data_train_2d, aux_data_train_2d], dim=1)
# train_data_small_2d = torch.cat(
# [signal_data_train_small_2d, aux_data_train_small_2d], dim=1)

Expand All @@ -342,20 +416,22 @@ def execute(
# ]) # for i in range(X_test.shape[0])

# whole dataset
signal_data_test_2d = torch.stack([
torch.stack(
[y_test_2d[main_channel].iloc[i]])
for i in range(y_test_2d.shape[0])
])
aux_data_test_2d = torch.stack([
torch.stack(
[X_test_2d.iloc[i, 0], X_test_2d.iloc[i, 1],
X_test_2d.iloc[i, 2]])
for i in range(X_test_2d.shape[0])
])

test_data_2d = torch.cat(
[signal_data_test_2d, aux_data_test_2d], dim=1)
signal_data_test_2d = torch.stack(
[
torch.stack([y_test_2d[main_channel].iloc[i]])
for i in range(y_test_2d.shape[0])
]
)
aux_data_test_2d = torch.stack(
[
torch.stack(
[X_test_2d.iloc[i, 0], X_test_2d.iloc[i, 1], X_test_2d.iloc[i, 2]]
)
for i in range(X_test_2d.shape[0])
]
)

test_data_2d = torch.cat([signal_data_test_2d, aux_data_test_2d], dim=1)
# test_data_small_2d = torch.cat(
# [signal_data_test_small_2d, aux_data_test_small_2d], dim=1)

Expand Down
3 changes: 1 addition & 2 deletions use-cases/virgo/src/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,8 +272,7 @@ def generate_cut_image_dataset(df, channels, num_processes=20, square_size=128):
# Use map to pass multiple arguments to process_image
results = list(pool.starmap(process_image, args))

df = pd.concat(results, ignore_index=True)
return df
return pd.concat(results, ignore_index=True)


def show_dataset(df, size, num_plots=10):
Expand Down
Loading

0 comments on commit 3f7282f

Please sign in to comment.