Skip to content

Commit

Permalink
remove old generation files and add script for concatenating hdf5 files
Browse files Browse the repository at this point in the history
  • Loading branch information
jarlsondre committed Nov 7, 2024
1 parent 31fdbd2 commit 3e674e6
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 129 deletions.
9 changes: 3 additions & 6 deletions use-cases/virgo/config.yaml
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
root_folder: /p/scratch/intertwin/datasets/virgo
hdf5_file_location: /p/project1/intertwin/saether1/itwinai/use-cases/virgo/data/virgo_data.hdf5
hdf5_dataset_name: virgo_dataset
chunk_size: 5
data_root: ./data
epochs: 10
batch_size: 3
batch_size: 10 # equivalent to chunk size
learning_rate: 0.0001
strategy: ddp
checkpoint_path: checkpoints/epoch_{}.pth
Expand All @@ -19,15 +17,14 @@ training_pipeline:
train_proportion: 0.9
validation_proportion: 0.1
rnd_seed: 42
root_folder: ${root_folder}
hdf5_file_location: ${hdf5_file_location}
chunk_size: ${chunk_size}
chunk_size: ${batch_size}
hdf5_dataset_name: ${hdf5_dataset_name}
- class_path: trainer.NoiseGeneratorTrainer
init_args:
config:
generator: simple #unet
batch_size: ${batch_size}
batch_size: 1
optim_lr: ${learning_rate}
loss: l1
save_best: true
Expand Down
101 changes: 8 additions & 93 deletions use-cases/virgo/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,17 +67,18 @@ def execute(self) -> pd.DataFrame:
return df


class SyntheticTimeSeriesDataset2(Dataset):
class SyntheticTimeSeriesDatasetHDF5(Dataset):
def __init__(
self, file: str, chunk_size: int = 500, hdf5_dataset_name: str = "virgo_dataset"
self, hdf5_file_location: str, chunk_size: int = 500, hdf5_dataset_name: str = "virgo_dataset"
):
"""Initialize the DataFrameDataset class.
Args:
root_folder (str): Location of the pickled DataFrames.
hdf5_file_location: Location of the HDF5 file containing the dataset
chunk_size: How many rows of data points each sample should contain
hdf5_dataset_name: The name of the Dataset object of the HDF5 file
"""
print(f"Running the SyntheticTimeSeriesDataset2!")
file_path = Path(file)
file_path = Path(hdf5_file_location)
if not file_path.exists():
raise ValueError(
f"Given file location, {file_path.resolve()} does not exist. "
Expand Down Expand Up @@ -132,97 +133,12 @@ def __getitem__(self, idx) -> torch.Tensor:
return normalize_(data)


class SyntheticTimeSeriesDataset(Dataset):
def __init__(
self,
root_folder: Optional[str] = None,
):
"""Initialize the DataFrameDataset class.
Args:
root_folder (str): Location of the pickled DataFrames.
"""

# Ensure the root folder exists
if not os.path.isdir(root_folder):
raise FileNotFoundError(f"Root folder '{root_folder}' not found.")

# Find all file paths in root folder
self.file_paths = [
os.path.join(dirpath, f)
for dirpath, dirs, files in os.walk(root_folder)
for f in files
if f.endswith(".pkl")
]

def __len__(self):
"""Return the total number of files in the dataset."""
return len(self.file_paths)

def __getitem__(self, idx):
"""Retrieve a data sample by index, convert to tensor, and normalize.
Args:
idx (int): Index of the file to retrieve.
Returns:
torch.Tensor: Concatenated and normalized data tensor
of main and auxiliary channels.
"""
# Load a single dataframe from the file
dataframe = pd.read_pickle(self.file_paths[idx])

# Convert all data in the DataFrame to torch tensors
df = dataframe.map(lambda x: torch.tensor(x))

# Divide Image dataset in main and aux channels.
main_channel = list(df.columns)[0]
aux_channels = list(df.columns)[1:]

# Ensure that there are at least 3 auxiliary channels
if len(aux_channels) < 3:
print(f"Item with the index {idx} only has {len(aux_channels)} channels!")
return None

# Extract the main channel and auxiliary channels
df_aux_all_2d = pd.DataFrame(df[aux_channels])
df_main_all_2d = pd.DataFrame(df[main_channel])

# Stack the main and auxiliary channels into 2D tensors
signal_data_train_2d = torch.stack(
[
torch.stack([df_main_all_2d[main_channel].iloc[i]])
for i in range(df_main_all_2d.shape[0])
]
)

aux_data_train_2d = torch.stack(
[
torch.stack(
[
df_aux_all_2d.iloc[i, 0],
df_aux_all_2d.iloc[i, 1],
df_aux_all_2d.iloc[i, 2],
]
)
for i in range(df_aux_all_2d.shape[0])
]
)

# Concatenate the main and auxiliary channel tensors
data_tensor = torch.cat([signal_data_train_2d, aux_data_train_2d], dim=1)

# Normalize and return the concatenated tensor
return normalize_(data_tensor)


class TimeSeriesDatasetSplitter(DataSplitter):
def __init__(
self,
train_proportion: int | float,
validation_proportion: int | float = 0.0,
test_proportion: int | float = 0.0,
root_folder: Optional[str] = None,
rnd_seed: Optional[int] = None,
name: Optional[str] = None,
hdf5_file_location: str = "data/virgo_data.hdf5",
Expand All @@ -242,7 +158,6 @@ def __init__(
super().__init__(train_proportion, validation_proportion, test_proportion, name)
self.save_parameters(**self.locals2params(locals()))
self.rnd_seed = rnd_seed
self.root_folder = root_folder
self.hdf5_file_location = hdf5_file_location
self.hdf5_dataset_name = hdf5_dataset_name
self.chunk_size = chunk_size
Expand All @@ -258,8 +173,8 @@ def execute(self) -> Tuple[Dataset, Dataset, Dataset]:
Tuple[Dataset, Dataset, Dataset]: Training, validation, and test datasets.
"""

whole_dataset = SyntheticTimeSeriesDataset2(
file=self.hdf5_file_location,
whole_dataset = SyntheticTimeSeriesDatasetHDF5(
hdf5_file_location=self.hdf5_file_location,
chunk_size=self.chunk_size,
hdf5_dataset_name=self.hdf5_dataset_name
)
Expand Down
82 changes: 82 additions & 0 deletions use-cases/virgo/synthetic_data_gen/concat_hdf5_dataset_files.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import argparse
import re
import h5py
import numpy as np
from pathlib import Path

def append_to_hdf5_dataset(
file_path: Path,
dataset_name: str,
array: np.ndarray,
expected_datapoint_shape: tuple,
):
if tuple(array.shape[1:]) != expected_datapoint_shape:
actual_shape_str = ", ".join(str(s) for s in array.shape)
expected_shape_str = ", ".join(str(s) for s in expected_datapoint_shape)
raise ValueError(
f"'array' has an incorrect shape: ({actual_shape_str}). "
f"Should have been (x, {expected_shape_str})."
)

print(f"Appending to file: '{str(file_path.resolve())}'.")
with h5py.File(file_path, "a") as f:
dset = f[dataset_name]
dset.resize(dset.shape[0] + array.shape[0], axis=0)
dset[-array.shape[0] :] = array

def main():
parser = argparse.ArgumentParser(description="Virgo Dataset Generation")
parser.add_argument(
"--dir", type=str, help="Directory containing the HDF5 files to concatenate"
)
parser.add_argument(
"--save-location",
type=str,
help="Location to save the resulting HDF5 file.",
default="total_virgo_data.hdf5",
)
args = parser.parse_args()
dir = Path(args.dir)
save_location = Path(args.save_location)
num_aux_channels = 3
square_size = 64
dataset_name = "virgo_dataset"

# Creating empty HDF5 file
datapoint_shape = (num_aux_channels + 1, square_size, square_size)
save_location.parent.mkdir(parents=True, exist_ok=True)

print(f"Creating/overwriting file: '{save_location.resolve()}'.")
with h5py.File(save_location, "w") as f:
dataset = f.create_dataset(
dataset_name,
shape=(0, *datapoint_shape),
maxshape=(None, *datapoint_shape),
dtype=np.float32,
)
dataset.attrs["Description"] = (
"Synthetic time series data for the Virgo use case"
)
dataset.attrs["main_channel_idx"] = 0

entries = []
# NOTE: This will not necessarily iterate in same order as the suffices of the
# file names
for entry in dir.iterdir():
if not entry.suffix == ".hdf5":
continue

print(f"Reading entry: {entry}")
with h5py.File(entry, "r") as f:
data = f[dataset_name][:]

append_to_hdf5_dataset(
file_path=save_location,
dataset_name=dataset_name,
array=data,
expected_datapoint_shape=datapoint_shape
)


if __name__ == "__main__":
main()
26 changes: 0 additions & 26 deletions use-cases/virgo/synthetic_data_gen/data_generation.sh

This file was deleted.

24 changes: 24 additions & 0 deletions use-cases/virgo/synthetic_data_gen/data_generation_hdf5.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#!/bin/bash

#SBATCH --account=intertwin
#SBATCH --output=array_job.out
#SBATCH --error=array_job.err
#SBATCH --time=01:00:00
#SBATCH --mem-per-cpu=1G
#SBATCH --partition=develbooster
#SBATCH --array=1-250
#SBATCH --job-name=generate_virgo_data

# Load required modules
ml Stages/2024 GCC OpenMPI CUDA/12 MPI-settings/CUDA Python HDF5 PnetCDF libaio mpi4py

# Activate Python virtual environment
source ../../envAI_hdfml/bin/activate

# Folder in which the datasets will be stored
target_file="/p/scratch/intertwin/datasets/virgo_hdf5/virgo_data_${SLURM_ARRAY_TASK_ID}.hdf5"

srun python synthetic_data_gen/file_gen_hdf5.py \
--num-datapoints 3000 \
--save-frequency 20 \
--save-location "$target_file"
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

from src.dataset import generate_cut_image_dataset


def append_to_hdf5_dataset(
file_path: Path,
dataset_name: str,
Expand All @@ -36,7 +35,7 @@ def append_to_hdf5_dataset(
dset[-array.shape[0] :] = array


def generate_pkl_dataset(
def generate_hdf5_dataset(
output_file: str = "virgo_data.hdf5",
dataset_name: str = "virgo_dataset",
num_datapoints=5,
Expand All @@ -48,6 +47,7 @@ def generate_pkl_dataset(
num_processes=4,
square_size=64,
datapoints_per_file=10,
seed=None
) -> None:
"""Generate a folder with num_files pickle files containing synthetic gravitational waves data.
Expand All @@ -64,12 +64,15 @@ def generate_pkl_dataset(
square_size (int): Size in pixels of qplot image (default is 500 samples per second).
datapoints_per_file (int): number of independent datapoints per pickle file.
"""
if seed is not None:
np.random.seed(seed)

datapoints = []

# Creating empty HDF5 file
datapoint_shape = (num_aux_channels + 1, square_size, square_size)
output_file_path = Path(output_file)
output_file_path.parent.mkdir(parents=True, exist_ok=True)
print(f"Creating/overwriting file: '{output_file_path.resolve()}'.")
with h5py.File(output_file_path, "w") as f:
dataset = f.create_dataset(
Expand All @@ -85,7 +88,6 @@ def generate_pkl_dataset(

for f in tqdm(range(num_datapoints)):
times = np.linspace(0, duration, duration * sample_rate)

# Initialise the main data as a list of zeros
main_data = np.zeros(len(times))
dictionary_aux = {}
Expand Down Expand Up @@ -188,10 +190,16 @@ def generate_pkl_dataset(
default="virgo_data.hdf5",
)

parser.add_argument(
"--seed",
type=int,
help="Seed for random number generator"
)

args = parser.parse_args()

start = time()
generate_pkl_dataset(
generate_hdf5_dataset(
output_file=args.save_location,
num_datapoints=args.num_datapoints,
duration=16,
Expand All @@ -201,7 +209,10 @@ def generate_pkl_dataset(
noise_amplitude=0.5,
datapoints_per_file=args.save_frequency,
num_processes=1,
seed=args.seed
)
end = time()
total_time = end - start
print(f"Generation took {total_time:.2f} seconds!")


0 comments on commit 3e674e6

Please sign in to comment.