diff --git a/.gitignore b/.gitignore index 43543abc..45bf5388 100644 --- a/.gitignore +++ b/.gitignore @@ -8,6 +8,8 @@ tmp* *.txt checkpoints/ mamba* +MNIST +mllogs # Custom envs .venv* diff --git a/Makefile b/Makefile index b256a9cd..ee8db5de 100644 --- a/Makefile +++ b/Makefile @@ -9,7 +9,7 @@ dev-env: dev-env.yml ai/setup.py # Create pytorch env under ./ai/ folder ai-env: ai/env-files/pytorch-lock.yml ai/setup.py - micromamba env create -p ./ai/.venv-pytorch --file ai/env-files/pytorch-lock.yml -y + micromamba env create -p ./ai/.venv-pytorch --file ai/env-files/pytorch-gpu-lock.yml -y micromamba run -p ./ai/.venv-pytorch python -m pip install -e ./ai lock-ai: ai/env-files/pytorch-env.yml ai/env-files/pytorch-env-gpu.yml diff --git a/ai/env-files/pytorch-env-gpu.yml b/ai/env-files/pytorch-env-gpu.yml index 73661948..efa02c52 100644 --- a/ai/env-files/pytorch-env-gpu.yml +++ b/ai/env-files/pytorch-env-gpu.yml @@ -20,6 +20,7 @@ dependencies: - lightning=2.0.0 - torchmetrics - mlflow>=2 + - wandb - typer - rich - pyyaml diff --git a/ai/env-files/pytorch-env.yml b/ai/env-files/pytorch-env.yml index 1266fec1..63298803 100644 --- a/ai/env-files/pytorch-env.yml +++ b/ai/env-files/pytorch-env.yml @@ -15,6 +15,7 @@ dependencies: - lightning=2.0.0 - torchmetrics - mlflow>=2 + - wandb - typer - rich - pyyaml diff --git a/ai/src/itwinai/backend/components.py b/ai/src/itwinai/backend/components.py index adc00e4d..16d506c6 100644 --- a/ai/src/itwinai/backend/components.py +++ b/ai/src/itwinai/backend/components.py @@ -58,6 +58,8 @@ def setup(self, pipeline): class Logger(metaclass=ABCMeta): + savedir: str = None + @abstractmethod def log(self, args): pass diff --git a/ai/src/itwinai/backend/torch/dist_trainer.py b/ai/src/itwinai/backend/torch/dist_trainer.py index 4393d705..3c658b02 100644 --- a/ai/src/itwinai/backend/torch/dist_trainer.py +++ b/ai/src/itwinai/backend/torch/dist_trainer.py @@ -32,13 +32,13 @@ def pars_ini(): # IO parsers parser.add_argument('--data-dir', default='./', help='location of the training dataset in the local filesystem') - # parser.add_argument('--restart-int', type=int, default=10, - # help='restart interval per epoch (default: 10)') + parser.add_argument('--restart-int', type=int, default=1, + help='restart interval per epoch (default: 10)') # model parsers parser.add_argument('--batch-size', type=int, default=64, help='input batch size for training (default: 64)') - parser.add_argument('--epochs', type=int, default=1, + parser.add_argument('--epochs', type=int, default=10, help='number of epochs to train (default: 10)') parser.add_argument('--lr', type=float, default=0.01, help='learning rate (default: 0.01)') @@ -90,7 +90,7 @@ def save_state(epoch, distrib_model, loss_acc, optimizer, res_name, grank, gwsiz # collect state state = {'epoch': epoch + 1, 'state_dict': distrib_model.state_dict(), - 'best_acc': loss_acc, + 'best_loss': loss_acc, 'optimizer': optimizer.state_dict()} # write on worker with is_best @@ -102,7 +102,7 @@ def save_state(epoch, distrib_model, loss_acc, optimizer, res_name, grank, gwsiz # collect state state = {'epoch': epoch + 1, 'state_dict': distrib_model.state_dict(), - 'best_acc': loss_acc, + 'best_loss': loss_acc, 'optimizer': optimizer.state_dict()} torch.save(state, './'+res_name) @@ -124,6 +124,7 @@ def seed_worker(worker_id): def par_allgather_obj(obj, gwsize): res = [None]*gwsize dist.all_gather_object(res, obj, group=None) + print(f'ALLGATHER: {res}') return res # # @@ -283,7 +284,7 @@ def main(): # resume state start_epoch = 1 - best_acc = np.Inf + best_loss = np.Inf res_name = 'checkpoint.pth.tar' if os.path.isfile(res_name) and not args.benchrun: try: @@ -297,7 +298,7 @@ def main(): else: checkpoint = torch.load(program_dir+'/'+res_name) start_epoch = checkpoint['epoch'] - best_acc = checkpoint['best_acc'] + best_loss = checkpoint['best_loss'] distrib_model.load_state_dict(checkpoint['state_dict']) optimizer.load_state_dict(checkpoint['optimizer']) if torch.cuda.is_available(): @@ -313,7 +314,7 @@ def main(): print(f'WARNING: restart file cannot be loaded, restarting!') if start_epoch >= args.epochs+1: - if args.cuda.is_available(): + if torch.cuda.is_available(): if grank == 0: print(f'WARNING: given epochs are less than the one in the restart file!\n' f'WARNING: SYS.EXIT is issued') @@ -367,12 +368,12 @@ def main(): sort_by='self_'+str(what1)+'_time_total')) # save state if found a better state - is_best = loss_acc < best_acc + is_best = loss_acc < best_loss if epoch % args.restart_int == 0 and not args.benchrun: save_state(epoch, distrib_model, loss_acc, optimizer, res_name, grank, gwsize, is_best) # reset best_acc - best_acc = min(loss_acc, best_acc) + best_loss = min(loss_acc, best_loss) # finalise # save final state diff --git a/ai/src/itwinai/backend/torch/loggers.py b/ai/src/itwinai/backend/torch/loggers.py index 86aefd33..64893889 100644 --- a/ai/src/itwinai/backend/torch/loggers.py +++ b/ai/src/itwinai/backend/torch/loggers.py @@ -1,3 +1,4 @@ +import os import wandb import mlflow import mlflow.keras @@ -19,3 +20,34 @@ def __init__(self): def log(self, args): mlflow.pytorch.autolog() + + +class BaseLogger(Logger): + def __init__( + self, + savedir: str = 'mllogs', + create_new: bool = True + ) -> None: + super().__init__() + self.savedir = savedir + + # From now on, very spaghetti... + os.makedirs(self.savedir, exist_ok=True) + if create_new: + run_dirs = sorted(os.listdir(self.savedir)) + if len(run_dirs) == 0: + self.run_id = 0 + else: + self.run_id = int(run_dirs[-1]) + 1 + self.run_path = os.path.join(self.savedir, str(self.run_id)) + os.makedirs(self.run_path) + else: + # "Wait" for the process 0 to create the run folder... + import time + time.sleep(0.1) + run_dirs = sorted(os.listdir(self.savedir)) + self.run_id = int(run_dirs[-1]) + self.run_path = os.path.join(self.savedir, str(self.run_id)) + + def log(self, args): + pass diff --git a/ai/src/itwinai/backend/torch/test.py b/ai/src/itwinai/backend/torch/test_basic_distrib.py similarity index 82% rename from ai/src/itwinai/backend/torch/test.py rename to ai/src/itwinai/backend/torch/test_basic_distrib.py index 1a02e0bd..e9bf7e59 100644 --- a/ai/src/itwinai/backend/torch/test.py +++ b/ai/src/itwinai/backend/torch/test_basic_distrib.py @@ -1,4 +1,6 @@ -"""test""" +""" +Test basic distribution strategies from example online +""" import os import torch @@ -16,7 +18,7 @@ def setup(rank, world_size): os.environ['MASTER_PORT'] = '12355' # initialize the process group - dist.init_process_group("gloo", rank=rank, world_size=world_size) + dist.init_process_group("nccl", rank=rank, world_size=world_size) def cleanup(): @@ -41,6 +43,14 @@ def demo_basic(rank, world_size): # create model and move it to GPU with id rank model = ToyModel().to(rank) ddp_model = DDP(model, device_ids=[rank]) + # if rank == 0: + # p = next(iter(model.parameters())) + # print((hash(p), id(p))) + + # p = next(iter(ddp_model.parameters())) + # print((hash(p), id(p))) + # cleanup() + # return loss_fn = nn.MSELoss() optimizer = optim.SGD(ddp_model.parameters(), lr=0.001) diff --git a/ai/src/itwinai/backend/torch/test_decorator.py b/ai/src/itwinai/backend/torch/test_decorator.py new file mode 100644 index 00000000..296d57df --- /dev/null +++ b/ai/src/itwinai/backend/torch/test_decorator.py @@ -0,0 +1,109 @@ +""" +Test Trainer class. To run this script, use the following command: + +>>> torchrun --nnodes=1 --nproc_per_node=2 --rdzv_id=100 --rdzv_backend=c10d \ + --rdzv_endpoint=localhost:29400 test_decorator.py + +""" + +import torch +from torch import nn +import torch.nn.functional as F +from torch.utils.data import DataLoader +from torchvision import transforms, datasets +import torch.optim as optim +from torch.optim.lr_scheduler import StepLR + +from itwinai.backend.torch.trainer import distributed + + +class Net(nn.Module): + + def __init__(self): + super(Net, self).__init__() + self.conv1 = nn.Conv2d(1, 10, kernel_size=5) + self.conv2 = nn.Conv2d(10, 20, kernel_size=5) + self.conv2_drop = nn.Dropout2d() + self.fc1 = nn.Linear(320, 50) + self.fc2 = nn.Linear(50, 10) + + def forward(self, x): + x = F.relu(F.max_pool2d(self.conv1(x), 2)) + x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2)) + x = x.view(-1, 320) + x = F.relu(self.fc1(x)) + x = F.dropout(x, training=self.training) + x = self.fc2(x) + return F.log_softmax(x, dim=0) + + +def train(model, device, train_loader, optimizer, epoch): + model.train() + for batch_idx, (data, target) in enumerate(train_loader): + data, target = data.to(device), target.to(device) + optimizer.zero_grad() + output = model(data) + loss = F.nll_loss(output, target) + loss.backward() + optimizer.step() + if batch_idx % 100 == 0: + print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format( + epoch, batch_idx * len(data), len(train_loader.dataset), + 100. * batch_idx / len(train_loader), loss.item())) + + +def test(model, device, test_loader): + model.eval() + test_loss = 0 + correct = 0 + with torch.no_grad(): + for data, target in test_loader: + data, target = data.to(device), target.to(device) + output = model(data) + # sum up batch loss + test_loss += F.nll_loss(output, target, reduction='sum').item() + # get the index of the max log-probability + pred = output.argmax(dim=1, keepdim=True) + correct += pred.eq(target.view_as(pred)).sum().item() + + test_loss /= len(test_loader.dataset) + + print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format( + test_loss, correct, len(test_loader.dataset), + 100. * correct / len(test_loader.dataset))) + + +@distributed +def train_func( + model, train_dataloader, validation_dataloader, device, + optimizer, scheduler, epochs=10 +): + for epoch in range(1, epochs + 1): + train(model, device, train_dataloader, optimizer, epoch) + test(model, device, validation_dataloader) + scheduler.step() + + +if __name__ == '__main__': + + train_set = datasets.MNIST( + '.tmp/', train=True, download=True, + transform=transforms.Compose([ + transforms.ToTensor(), + transforms.Normalize((0.1307,), (0.3081,)) + ])) + val_set = datasets.MNIST( + '.tmp/', train=False, download=True, + transform=transforms.Compose([ + transforms.ToTensor(), + transforms.Normalize((0.1307,), (0.3081,)) + ])) + model = Net() + train_dataloader = DataLoader(train_set, batch_size=32, pin_memory=True) + validation_dataloader = DataLoader(val_set, batch_size=32, pin_memory=True) + optimizer = optim.Adadelta(model.parameters(), lr=1e-3) + scheduler = StepLR(optimizer, step_size=1, gamma=0.9) + + # Train distributed + train_func(model, train_dataloader, validation_dataloader, 'cuda', + optimizer, scheduler=scheduler) diff --git a/ai/src/itwinai/backend/torch/test_trainer.py b/ai/src/itwinai/backend/torch/test_trainer.py new file mode 100644 index 00000000..90aa57d6 --- /dev/null +++ b/ai/src/itwinai/backend/torch/test_trainer.py @@ -0,0 +1,61 @@ +""" +Test Trainer class. To run this script, use the following command: + +>>> torchrun --nnodes=1 --nproc_per_node=2 --rdzv_id=100 --rdzv_backend=c10d \ + --rdzv_endpoint=localhost:29400 test_trainer.py + +""" + +from torch import nn +import torch.nn.functional as F +from torch.utils.data import DataLoader +from torchvision import transforms, datasets + +from itwinai.backend.torch.trainer import TorchTrainer + + +class Net(nn.Module): + + def __init__(self): + super(Net, self).__init__() + self.conv1 = nn.Conv2d(1, 10, kernel_size=5) + self.conv2 = nn.Conv2d(10, 20, kernel_size=5) + self.conv2_drop = nn.Dropout2d() + self.fc1 = nn.Linear(320, 50) + self.fc2 = nn.Linear(50, 10) + + def forward(self, x): + x = F.relu(F.max_pool2d(self.conv1(x), 2)) + x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2)) + x = x.view(-1, 320) + x = F.relu(self.fc1(x)) + x = F.dropout(x, training=self.training) + x = self.fc2(x) + return F.log_softmax(x, dim=0) + + +if __name__ == '__main__': + train_set = datasets.MNIST( + '.tmp/', train=True, download=True, + transform=transforms.Compose([ + transforms.ToTensor(), + transforms.Normalize((0.1307,), (0.3081,)) + ])) + val_set = datasets.MNIST( + '.tmp/', train=False, download=True, + transform=transforms.Compose([ + transforms.ToTensor(), + transforms.Normalize((0.1307,), (0.3081,)) + ])) + trainer = TorchTrainer( + model=Net(), + train_dataloader=DataLoader(train_set, batch_size=32, pin_memory=True), + validation_dataloader=DataLoader( + val_set, batch_size=32, pin_memory=True), + strategy='ddp', + backend='nccl', + loss='NLLLoss', + epochs=20, + checkpoint_every=1 + ) + trainer.train() diff --git a/ai/src/itwinai/backend/torch/trainer.py b/ai/src/itwinai/backend/torch/trainer.py index dda0ceca..c2baefd5 100644 --- a/ai/src/itwinai/backend/torch/trainer.py +++ b/ai/src/itwinai/backend/torch/trainer.py @@ -1,7 +1,10 @@ from abc import abstractmethod -from typing import Optional, Dict, Union, Iterable -from enum import Enum, EnumMeta +from typing import Optional, Dict, Union, Callable, Tuple, Type, List +import time +import os +import sys +import numpy as np import torch from torch.utils.data import DataLoader @@ -9,41 +12,85 @@ import torch.distributed as dist from torch.nn.parallel import DistributedDataParallel as DDP import torch.nn as nn -import torch.nn.functional as F from torch.optim.optimizer import Optimizer -from ..components import Trainer -from .utils import seed_worker - - -class MetaEnum(EnumMeta): - def __contains__(cls, item): - try: - cls(item) - except ValueError: - return False - return True - +from ..components import Trainer, Logger +from .utils import seed_worker, save_state +from .types import ( + TorchDistributedBackend, + TorchDistributedStrategy, + TorchLoss, TorchOptimizer +) +from .loggers import BaseLogger + +Loss = nn.Module + + +def preproc_dataloader(dataloader: DataLoader, gwsize, grank): + """Make a Dataloader distributed""" + sampler = DistributedSampler( + dataloader.dataset, + num_replicas=gwsize, + rank=grank, + shuffle=True + ) + # Recreate dataloader, with updated sampler + return DataLoader( + dataloader.dataset, + batch_size=dataloader.batch_size, + sampler=sampler, + num_workers=dataloader.num_workers, + collate_fn=dataloader.collate_fn, + pin_memory=dataloader.pin_memory, + drop_last=dataloader.drop_last, + timeout=dataloader.timeout, + worker_init_fn=seed_worker, # dataloader.worker_init_fn, + multiprocessing_context=dataloader.multiprocessing_context, + generator=dataloader.generator, + prefetch_factor=dataloader.prefetch_factor, + persistent_workers=dataloader.persistent_workers, + pin_memory_device=dataloader.pin_memory_device + ) + + +def distributed(func): + def dist_train(model, train_dataloader, + validation_dataloader=None, device='cpu', + *args, **kwargs): + if torch.cuda.is_available(): + dist.init_process_group(backend='nccl') -class BaseEnum(Enum, metaclass=MetaEnum): - @classmethod - def list(cls): - return list(map(lambda c: c.value, cls)) + if torch.cuda.is_available(): + lwsize = torch.cuda.device_count() # local world size - per node + gwsize = dist.get_world_size() # global world size - per run + grank = dist.get_rank() # global rank - assign per run + lrank = dist.get_rank() % lwsize # local rank - assign per node + else: + gwsize = 1 + grank = 0 + lrank = 0 + device = torch.device( + 'cuda' if torch.cuda.is_available() else 'cpu', lrank) + if torch.cuda.is_available(): + torch.cuda.set_device(lrank) -class TorchDistributedBackend(BaseEnum): - """ - Enum for torch distributed backends. - Reference: https://pytorch.org/docs/stable/distributed.html#backends - """ - GLOO = 'gloo' - NCCL = 'nccl' - MPI = 'mpi' + model = model.to(device) + model = DDP(model, device_ids=[device], output_device=device) + train_dataloader = preproc_dataloader(train_dataloader, gwsize, grank) + if validation_dataloader is not None: + validation_dataloader = preproc_dataloader( + validation_dataloader, gwsize, grank) -class TorchDistributedStrategy(BaseEnum): - NONE = None - DDP = 'ddp' + try: + func(model, train_dataloader, validation_dataloader, *args, + device=device, **kwargs) + finally: + if torch.cuda.is_available(): + dist.barrier() + dist.destroy_process_group() + return dist_train class TorchTrainer(Trainer): @@ -52,25 +99,41 @@ class TorchTrainer(Trainer): Assumes to be executed in a SLURM cluster with torchrun. Use the torch elastic version of DDP: https://pytorch.org/tutorials/intermediate/ddp_tutorial.html#initialize-ddp-with-torch-distributed-run-torchrun + + TODO: complete loss function and optimizer defaults """ + model: nn.Module = None + optimizer: Optimizer = None + _loss: Callable = None + train_dataloader: DataLoader = None + validation_dataloader: DataLoader = None + strategy: TorchDistributedStrategy = None + backend: TorchDistributedBackend = 'nccl' + def __init__( - self, - model: nn.Module, - epochs: int, - # learning_rate: float = 1e-3, - # optim_kwargs: Optional[Dict] = None, - testrun: bool = False, - shuffle_data: bool = False, - seed: Optional[int] = None, - log_int: int = 10, - strategy: Optional[TorchDistributedStrategy] = None, - backend: TorchDistributedBackend = 'nccl', - use_cuda: bool = True, - benchrun: bool = False + self, + model: nn.Module, + train_dataloader: DataLoader, + validation_dataloader: Optional[DataLoader] = None, + epochs: int = 1, + loss: Union[Callable, TorchLoss] = 'MSELoss', + optimizer: Union[Optimizer, TorchOptimizer] = 'SGD', + optimizer_kwargs: Optional[Dict] = None, + testrun: bool = False, + shuffle_data: bool = False, + seed: Optional[int] = None, + log_int: int = 10, + strategy: Optional[TorchDistributedStrategy] = None, + backend: TorchDistributedBackend = 'nccl', + use_cuda: bool = True, + benchrun: bool = False, + logger: Optional[List[Logger]] = None, + checkpoint_every: int = 10 ) -> None: self.model = model self.epochs = epochs + self.loss = loss self.testrun = testrun self.seed = seed self.shuffle_data = shuffle_data @@ -79,8 +142,8 @@ def __init__( self.backend = backend self.use_cuda = use_cuda self.benchrun = benchrun - # self.learning_rate = learning_rate - # self.optim_kwargs = optim_kwargs + # Checkpoint every n epochs + self.checkpoint_every = checkpoint_every self.cuda = self.use_cuda and torch.cuda.is_available() @@ -121,9 +184,13 @@ def __init__( self.model = self.model.to(self.device) # Create distributed model - if self.strategy == TorchDistributedStrategy.NONE: + if self.strategy == TorchDistributedStrategy.NONE.value: pass - elif self.strategy == TorchDistributedStrategy.DDP: + elif self.strategy == TorchDistributedStrategy.DDP.value: + if not self.cuda: + raise RuntimeError( + "Cannot use torch distributed data parallel without CUDA." + ) self.model = DDP( self.model, device_ids=[self.device], @@ -132,8 +199,21 @@ def __init__( else: raise NotImplementedError("Only DDP strategy is implemented.") - # # Optimizer - # self.optimizer = ... + # Optimizers + self.optimizer = self.configure_optimizers( + optim=optimizer, optim_kwargs=optimizer_kwargs) + + # Dataloaders + self.train_dataloader = self._preproc_dataloader(train_dataloader) + if validation_dataloader is not None: + self.validation_dataloader = self._preproc_dataloader( + validation_dataloader + ) + + self.logger = ( + logger if logger is not None + else BaseLogger(create_new=self.grank == 0) + ) @property def backend(self) -> str: @@ -159,6 +239,68 @@ def strategy(self, strategy_name) -> None: f"are: {TorchDistributedStrategy.list()}") self._strategy = strategy_name + @property + def loss(self) -> Callable: + return self._loss + + @loss.setter + def loss(self, loss: Union[Callable, TorchLoss]) -> None: + if hasattr(loss, '__call__'): + self._loss = loss + elif isinstance(loss, str) and loss in TorchLoss: + self._loss = self._default_loss(loss) + else: + raise ValueError( + "Unrecognized loss type (if you gave a string, it has to be case sensitive).") + + def _default_loss(self, loss: TorchLoss) -> Callable: + if loss == TorchLoss.L1.value: + return nn.L1Loss() + if loss == TorchLoss.MSE.value: + return nn.MSELoss() + if loss == TorchLoss.CROSS_ENTROPY.value: + return nn.CrossEntropyLoss() + if loss == TorchLoss.NLLLOSS.value: + return nn.NLLLoss() + + # TODO: support all losses form https://pytorch.org/docs/stable/nn.html#loss-functions + raise NotImplementedError( + "Argh! Support for other losses is still missing...") + + def configure_optimizers( + self, + optim: Union[Optimizer, TorchOptimizer], + optim_kwargs: Optional[Dict] = None + ) -> Optimizer: + if isinstance(optim, Optimizer): + # The optimizer is already instantiated + return optim + + if isinstance(optim, str) and optim in TorchOptimizer: + # Optimizer has to be instantiated from its name and kwargs + optimizer_class, def_args = self._default_optimizer_class(optim) + optim_kwargs = def_args if optim_kwargs is None else def_args.update( + optim_kwargs) + return optimizer_class(self.model.parameters(), **optim_kwargs) + + raise ValueError( + "Unrecognized optimizer type (if you gave a string, " + "it has to be case sensitive)." + ) + + def _default_optimizer_class(self, optim: TorchOptimizer) -> Tuple[Type, Dict]: + """ + Returns optimizer class and a default value for its required construnctor args, if any. + """ + if optim == TorchOptimizer.SGD.value: + return torch.optim.SGD, dict(lr=1e-3) + if optim == TorchOptimizer.ADAM.value: + return torch.optim.Adam, dict() + + # TODO: support all optimizers from https://pytorch.org/docs/stable/optim.html#algorithms + raise NotImplementedError( + "Argh! Support for other losses is still missing...") + def setup(self, args) -> None: pass @@ -202,7 +344,6 @@ def _preproc_dataloader(self, dataloader: DataLoader) -> DataLoader: # Recreate dataloader, with updated sampler return DataLoader( dataloader.dataset, - shuffle=self.shuffle_data, batch_size=dataloader.batch_size, sampler=sampler, num_workers=dataloader.num_workers, @@ -218,39 +359,196 @@ def _preproc_dataloader(self, dataloader: DataLoader) -> DataLoader: pin_memory_device=dataloader.pin_memory_device ) - # @abstractmethod - # def configure_optimizers(self) -> Union[Optimizer, Iterable[Optimizer]]: - # pass - - @abstractmethod - def training_step(self, batch, batch_idx): - pass - - @abstractmethod - def validation_step(self, batch, batch_idx): - pass - - def train( - self, - train_dataloader: DataLoader, - validation_dataloader: Optional[DataLoader] = None - ): - train_dataloader = self._preproc_dataloader(train_dataloader) - if validation_dataloader is not None: - validation_dataloader = self._preproc_dataloader( - validation_dataloader - ) + def training_step(self, batch, batch_idx) -> Loss: + x, y = batch + x, y = x.to(self.device), y.to(self.device) + pred_y = self.model(x) + return self.loss(pred_y, y) - # self._optimizers = self.configure_optimizers() + def validation_step(self, batch, batch_idx) -> Loss: + x, y = batch + x, y = x.to(self.device), y.to(self.device) + pred_y = self.model(x) + return self.loss(pred_y, y) + def training_epoch(self, epoch_idx) -> Loss: self.model.train() - for _ in range(self.epochs): - for tr_b_idx, train_batch in enumerate(train_dataloader): - self.training_step(batch=train_batch, batch_idx=tr_b_idx) - if validation_dataloader is not None: - for val_b_idx, val_batch in enumerate(validation_dataloader): - self.validation_step(batch=val_batch, batch_idx=val_b_idx) - - # def optimizers(self) -> Union[Optimizer, Iterable[Optimizer]]: - # """Get optimizers""" - # return self._optimizers + train_losses = [] + # TODO: use tqdm + for tr_b_idx, train_batch in enumerate(self.train_dataloader): + loss = self.training_step( + batch=train_batch, + batch_idx=tr_b_idx + ) + self.optimizer.zero_grad() + loss.backward() + self.optimizer.step() + train_losses.append(loss) + avg_loss = torch.mean(torch.stack(train_losses)).detach().cpu() + print(f"Avg train loss: {avg_loss}") + return avg_loss + + def validation_epoch(self, epoch_idx) -> Loss: + if self.validation_dataloader is not None: + self.model.eval() + validation_losses = [] + # TODO: use tqdm + for val_b_idx, val_batch in enumerate(self.validation_dataloader): + loss = self.validation_step( + batch=val_batch, + batch_idx=val_b_idx + ) + validation_losses.append(loss) + avg_loss = torch.mean( + torch.stack(validation_losses) + ).detach().cpu() + print(f"Avg validation loss: {avg_loss}") + return avg_loss + + def train(self): + + if self.optimizer is None: + raise ValueError("Undefined optimizer!") + + if self.loss is None: + raise ValueError("Undefined loss function!") + + st = time.time() + + # Resume state + start_epoch = 1 + best_loss = np.Inf + res_name = os.path.join(self.logger.run_path, 'checkpoint.pth.tar') + if os.path.isfile(res_name) and not self.benchrun: + try: + if torch.cuda.is_available(): + dist.barrier() + # Map model to be loaded to specified single gpu. + loc = {'cuda:%d' % 0: 'cuda:%d' % self.lrank} if self.cuda else { + 'cpu:%d' % 0: 'cpu:%d' % self.lrank} + checkpoint = torch.load(res_name, map_location=loc) + else: + checkpoint = torch.load(res_name, map_location='cpu') + start_epoch = checkpoint['epoch'] + best_loss = checkpoint['best_loss'] + self.model.load_state_dict(checkpoint['state_dict']) + self.optimizer.load_state_dict(checkpoint['optimizer']) + if torch.cuda.is_available(): + if self.grank == 0: + print(f'WARNING: restarting from {start_epoch} epoch') + else: + print(f'WARNING: restarting from {start_epoch} epoch') + except: + if torch.cuda.is_available(): + if self.grank == 0: + print('WARNING: restart file cannot be loaded, restarting!') + else: + print('WARNING: restart file cannot be loaded, restarting!') + + if start_epoch >= self.epochs + 1: + if torch.cuda.is_available(): + if self.grank == 0: + print('WARNING: given epochs are less than the one in the restart file!\n' + 'WARNING: SYS.EXIT is issued') + dist.destroy_process_group() + sys.exit() + else: + print('WARNING: given epochs are less than the one in the restart file!\n' + 'WARNING: SYS.EXIT is issued') + sys.exit() + + # start trainin/testing loop + if self.grank == 0: + print('TIMER: broadcast:', time.time()-st, 's') + print(f'\nDEBUG: start training') + print(f'--------------------------------------------------------') + + et = time.time() + # TODO use tqdm? For distributed situations could be difficult + for epoch_idx in range(start_epoch, self.epochs + 1): + lt = time.time() + + if self.benchrun and epoch_idx == self.epochs: + # profiling (done on last epoch - slower!) + with torch.autograd.profiler.profile(use_cuda=self.cuda, + profile_memory=True) as prof: + train_loss = self.training_epoch(epoch_idx=epoch_idx) + else: + train_loss = self.training_epoch(epoch_idx=epoch_idx) + val_loss = self.validation_epoch(epoch_idx=epoch_idx) + + # save first epoch timer + if epoch_idx == start_epoch: + first_ep_t = time.time()-lt + + # final epoch + if epoch_idx + 1 == self.epochs: + self.train_dataloader.last_epoch = True + self.validation_dataloader.last_epoch = True + + if self.grank == 0: + print('TIMER: epoch time:', time.time()-lt, 's') + if self.benchrun and epoch_idx == self.epochs: + print('\n--------------------------------------------------------') + print('DEBUG: benchmark of last epoch:\n') + what1 = 'cuda' if self.cuda else 'cpu' + print(prof.key_averages().table( + sort_by='self_'+str(what1)+'_time_total')) + + # save state if found a better state + ref_loss = val_loss if val_loss is not None else train_loss + is_best = ref_loss < best_loss + if epoch_idx % self.checkpoint_every == 0 and not self.benchrun: + save_state( + epoch_idx, self.model, ref_loss, self.optimizer, + res_name, self.grank, self.gwsize, is_best + ) + # reset best_acc + best_loss = min(ref_loss, best_loss) + + # save final state + if not self.benchrun: + save_state( + epoch_idx, self.model, ref_loss, + self.optimizer, res_name, self.grank, self.gwsize, True + ) + if torch.cuda.is_available(): + dist.barrier() + + # some debug + if self.grank == 0: + print('\n--------------------------------------------------------') + print('DEBUG: training results:\n') + print('TIMER: first epoch time:', first_ep_t, ' s') + print('TIMER: last epoch time:', time.time()-lt, ' s') + print('TIMER: average epoch time:', + (time.time()-et)/self.epochs, ' s') + print('TIMER: total epoch time:', time.time()-et, ' s') + if epoch_idx > 1: + print('TIMER: total epoch-1 time:', + time.time()-et-first_ep_t, ' s') + print('TIMER: average epoch-1 time:', + (time.time()-et-first_ep_t)/(self.epochs-1), ' s') + if self.benchrun: + print('TIMER: total epoch-2 time:', lt-first_ep_t, ' s') + print('TIMER: average epoch-2 time:', + (lt-first_ep_t)/(self.epochs-2), ' s') + print('DEBUG: memory req:', int(torch.cuda.memory_reserved(self.lrank)/1024/1024), 'MB') \ + if self.cuda else 'DEBUG: memory req: - MB' + print('DEBUG: memory summary:\n\n', + torch.cuda.memory_summary(0)) if self.cuda else '' + + if self.grank == 0: + print(f'TIMER: final time: {time.time()-st} s\n') + + # TODO: use a with? + self.cleanup() + + def cleanup(self): + """ + Destroy a given process group, and deinitialize the distributed + package. + """ + if torch.cuda.is_available(): + dist.barrier() + dist.destroy_process_group() diff --git a/ai/src/itwinai/backend/torch/types.py b/ai/src/itwinai/backend/torch/types.py new file mode 100644 index 00000000..e749f7f5 --- /dev/null +++ b/ai/src/itwinai/backend/torch/types.py @@ -0,0 +1,51 @@ +from enum import Enum, EnumMeta + + +class MetaEnum(EnumMeta): + def __contains__(cls, item): + try: + cls(item) + except ValueError: + return False + return True + + +class BaseEnum(Enum, metaclass=MetaEnum): + @classmethod + def list(cls): + return list(map(lambda c: c.value, cls)) + + +class TorchDistributedBackend(BaseEnum): + """ + Enum for torch distributed backends. + Reference: https://pytorch.org/docs/stable/distributed.html#backends + """ + GLOO = 'gloo' + NCCL = 'nccl' + MPI = 'mpi' + + +class TorchDistributedStrategy(BaseEnum): + NONE = None + DDP = 'ddp' + + +class TorchLoss(BaseEnum): + """ + Torch loss class names. + TODO: complete from https://pytorch.org/docs/stable/nn.html#loss-functions + """ + L1 = 'L1Loss' + MSE = 'MSELoss' + CROSS_ENTROPY = 'CrossEntropyLoss' + NLLLOSS = 'NLLLoss' + + +class TorchOptimizer(BaseEnum): + """ + Torch optimizer class names. + TODO: complete from https://pytorch.org/docs/stable/optim.html#algorithms + """ + SGD = 'SGD' + ADAM = 'Adam' diff --git a/ai/src/itwinai/backend/torch/utils.py b/ai/src/itwinai/backend/torch/utils.py index 766e9886..8ed17b44 100644 --- a/ai/src/itwinai/backend/torch/utils.py +++ b/ai/src/itwinai/backend/torch/utils.py @@ -1,6 +1,4 @@ # std libs -import sys -import os import time import numpy as np import random @@ -8,14 +6,12 @@ # ml libs import torch import torch.distributed as dist -import torch.nn as nn -import torch.nn.functional as F -import torch.optim as optim -from torchvision import datasets, transforms -def save_state(epoch, distrib_model, loss_acc, optimizer, res_name, grank, gwsize, is_best): - """save state of the training""" +def save_state( + epoch, distrib_model, loss_val, optimizer, res_name, grank, gwsize, is_best +): + """Save training state""" rt = time.time() # find if is_best happened in any worker if torch.cuda.is_available(): @@ -24,29 +20,30 @@ def save_state(epoch, distrib_model, loss_acc, optimizer, res_name, grank, gwsiz if torch.cuda.is_available(): if any(is_best_m): # find which rank is_best happened - select first rank if multiple - is_best_rank = np.where(np.array(is_best_m) == True)[0][0] + is_best_rank = np.where(np.array(is_best_m))[0][0] # collect state state = {'epoch': epoch + 1, 'state_dict': distrib_model.state_dict(), - 'best_acc': loss_acc, + 'best_loss': loss_val, 'optimizer': optimizer.state_dict()} # write on worker with is_best if grank == is_best_rank: torch.save(state, './'+res_name) - print( - f'DEBUG: state in {grank} is saved on epoch:{epoch} in {time.time()-rt} s') + print(f'DEBUG: state in {grank} is saved on ' + f'epoch:{epoch} in {time.time()-rt} s') else: # collect state state = {'epoch': epoch + 1, 'state_dict': distrib_model.state_dict(), - 'best_acc': loss_acc, + 'best_loss': loss_val, 'optimizer': optimizer.state_dict()} torch.save(state, './'+res_name) print( - f'DEBUG: state in {grank} is saved on epoch:{epoch} in {time.time()-rt} s') + f'DEBUG: state in {grank} is saved on epoch:{epoch} ' + f'in {time.time()-rt} s') def seed_worker(worker_id): @@ -60,4 +57,5 @@ def par_allgather_obj(obj, gwsize): """gathers any object from the whole group in a list (to all workers)""" res = [None]*gwsize dist.all_gather_object(res, obj, group=None) + # print(f'ALLGATHER: {res}') return res