Skip to content

Commit

Permalink
ADD: distributed torch Trainer and decorator
Browse files Browse the repository at this point in the history
  • Loading branch information
Matteo Bunino committed Aug 9, 2023
1 parent c744f85 commit 57b415b
Show file tree
Hide file tree
Showing 13 changed files with 676 additions and 110 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ tmp*
*.txt
checkpoints/
mamba*
MNIST
mllogs

# Custom envs
.venv*
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ dev-env: dev-env.yml ai/setup.py

# Create pytorch env under ./ai/ folder
ai-env: ai/env-files/pytorch-lock.yml ai/setup.py
micromamba env create -p ./ai/.venv-pytorch --file ai/env-files/pytorch-lock.yml -y
micromamba env create -p ./ai/.venv-pytorch --file ai/env-files/pytorch-gpu-lock.yml -y
micromamba run -p ./ai/.venv-pytorch python -m pip install -e ./ai

lock-ai: ai/env-files/pytorch-env.yml ai/env-files/pytorch-env-gpu.yml
Expand Down
1 change: 1 addition & 0 deletions ai/env-files/pytorch-env-gpu.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ dependencies:
- lightning=2.0.0
- torchmetrics
- mlflow>=2
- wandb
- typer
- rich
- pyyaml
Expand Down
1 change: 1 addition & 0 deletions ai/env-files/pytorch-env.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ dependencies:
- lightning=2.0.0
- torchmetrics
- mlflow>=2
- wandb
- typer
- rich
- pyyaml
Expand Down
2 changes: 2 additions & 0 deletions ai/src/itwinai/backend/components.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ def setup(self, pipeline):


class Logger(metaclass=ABCMeta):
savedir: str = None

@abstractmethod
def log(self, args):
pass
21 changes: 11 additions & 10 deletions ai/src/itwinai/backend/torch/dist_trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@ def pars_ini():
# IO parsers
parser.add_argument('--data-dir', default='./',
help='location of the training dataset in the local filesystem')
# parser.add_argument('--restart-int', type=int, default=10,
# help='restart interval per epoch (default: 10)')
parser.add_argument('--restart-int', type=int, default=1,
help='restart interval per epoch (default: 10)')

# model parsers
parser.add_argument('--batch-size', type=int, default=64,
help='input batch size for training (default: 64)')
parser.add_argument('--epochs', type=int, default=1,
parser.add_argument('--epochs', type=int, default=10,
help='number of epochs to train (default: 10)')
parser.add_argument('--lr', type=float, default=0.01,
help='learning rate (default: 0.01)')
Expand Down Expand Up @@ -90,7 +90,7 @@ def save_state(epoch, distrib_model, loss_acc, optimizer, res_name, grank, gwsiz
# collect state
state = {'epoch': epoch + 1,
'state_dict': distrib_model.state_dict(),
'best_acc': loss_acc,
'best_loss': loss_acc,
'optimizer': optimizer.state_dict()}

# write on worker with is_best
Expand All @@ -102,7 +102,7 @@ def save_state(epoch, distrib_model, loss_acc, optimizer, res_name, grank, gwsiz
# collect state
state = {'epoch': epoch + 1,
'state_dict': distrib_model.state_dict(),
'best_acc': loss_acc,
'best_loss': loss_acc,
'optimizer': optimizer.state_dict()}

torch.save(state, './'+res_name)
Expand All @@ -124,6 +124,7 @@ def seed_worker(worker_id):
def par_allgather_obj(obj, gwsize):
res = [None]*gwsize
dist.all_gather_object(res, obj, group=None)
print(f'ALLGATHER: {res}')
return res
#
#
Expand Down Expand Up @@ -283,7 +284,7 @@ def main():

# resume state
start_epoch = 1
best_acc = np.Inf
best_loss = np.Inf
res_name = 'checkpoint.pth.tar'
if os.path.isfile(res_name) and not args.benchrun:
try:
Expand All @@ -297,7 +298,7 @@ def main():
else:
checkpoint = torch.load(program_dir+'/'+res_name)
start_epoch = checkpoint['epoch']
best_acc = checkpoint['best_acc']
best_loss = checkpoint['best_loss']
distrib_model.load_state_dict(checkpoint['state_dict'])
optimizer.load_state_dict(checkpoint['optimizer'])
if torch.cuda.is_available():
Expand All @@ -313,7 +314,7 @@ def main():
print(f'WARNING: restart file cannot be loaded, restarting!')

if start_epoch >= args.epochs+1:
if args.cuda.is_available():
if torch.cuda.is_available():
if grank == 0:
print(f'WARNING: given epochs are less than the one in the restart file!\n'
f'WARNING: SYS.EXIT is issued')
Expand Down Expand Up @@ -367,12 +368,12 @@ def main():
sort_by='self_'+str(what1)+'_time_total'))

# save state if found a better state
is_best = loss_acc < best_acc
is_best = loss_acc < best_loss
if epoch % args.restart_int == 0 and not args.benchrun:
save_state(epoch, distrib_model, loss_acc, optimizer,
res_name, grank, gwsize, is_best)
# reset best_acc
best_acc = min(loss_acc, best_acc)
best_loss = min(loss_acc, best_loss)

# finalise
# save final state
Expand Down
32 changes: 32 additions & 0 deletions ai/src/itwinai/backend/torch/loggers.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import os
import wandb
import mlflow
import mlflow.keras
Expand All @@ -19,3 +20,34 @@ def __init__(self):

def log(self, args):
mlflow.pytorch.autolog()


class BaseLogger(Logger):
def __init__(
self,
savedir: str = 'mllogs',
create_new: bool = True
) -> None:
super().__init__()
self.savedir = savedir

# From now on, very spaghetti...
os.makedirs(self.savedir, exist_ok=True)
if create_new:
run_dirs = sorted(os.listdir(self.savedir))
if len(run_dirs) == 0:
self.run_id = 0
else:
self.run_id = int(run_dirs[-1]) + 1
self.run_path = os.path.join(self.savedir, str(self.run_id))
os.makedirs(self.run_path)
else:
# "Wait" for the process 0 to create the run folder...
import time
time.sleep(0.1)
run_dirs = sorted(os.listdir(self.savedir))
self.run_id = int(run_dirs[-1])
self.run_path = os.path.join(self.savedir, str(self.run_id))

def log(self, args):
pass
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
"""test"""
"""
Test basic distribution strategies from example online
"""
import os

import torch
Expand All @@ -16,7 +18,7 @@ def setup(rank, world_size):
os.environ['MASTER_PORT'] = '12355'

# initialize the process group
dist.init_process_group("gloo", rank=rank, world_size=world_size)
dist.init_process_group("nccl", rank=rank, world_size=world_size)


def cleanup():
Expand All @@ -41,6 +43,14 @@ def demo_basic(rank, world_size):
# create model and move it to GPU with id rank
model = ToyModel().to(rank)
ddp_model = DDP(model, device_ids=[rank])
# if rank == 0:
# p = next(iter(model.parameters()))
# print((hash(p), id(p)))

# p = next(iter(ddp_model.parameters()))
# print((hash(p), id(p)))
# cleanup()
# return

loss_fn = nn.MSELoss()
optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)
Expand Down
109 changes: 109 additions & 0 deletions ai/src/itwinai/backend/torch/test_decorator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
"""
Test Trainer class. To run this script, use the following command:
>>> torchrun --nnodes=1 --nproc_per_node=2 --rdzv_id=100 --rdzv_backend=c10d \
--rdzv_endpoint=localhost:29400 test_decorator.py
"""

import torch
from torch import nn
import torch.nn.functional as F
from torch.utils.data import DataLoader
from torchvision import transforms, datasets
import torch.optim as optim
from torch.optim.lr_scheduler import StepLR

from itwinai.backend.torch.trainer import distributed


class Net(nn.Module):

def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
self.conv2_drop = nn.Dropout2d()
self.fc1 = nn.Linear(320, 50)
self.fc2 = nn.Linear(50, 10)

def forward(self, x):
x = F.relu(F.max_pool2d(self.conv1(x), 2))
x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
x = x.view(-1, 320)
x = F.relu(self.fc1(x))
x = F.dropout(x, training=self.training)
x = self.fc2(x)
return F.log_softmax(x, dim=0)


def train(model, device, train_loader, optimizer, epoch):
model.train()
for batch_idx, (data, target) in enumerate(train_loader):
data, target = data.to(device), target.to(device)
optimizer.zero_grad()
output = model(data)
loss = F.nll_loss(output, target)
loss.backward()
optimizer.step()
if batch_idx % 100 == 0:
print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
epoch, batch_idx * len(data), len(train_loader.dataset),
100. * batch_idx / len(train_loader), loss.item()))


def test(model, device, test_loader):
model.eval()
test_loss = 0
correct = 0
with torch.no_grad():
for data, target in test_loader:
data, target = data.to(device), target.to(device)
output = model(data)
# sum up batch loss
test_loss += F.nll_loss(output, target, reduction='sum').item()
# get the index of the max log-probability
pred = output.argmax(dim=1, keepdim=True)
correct += pred.eq(target.view_as(pred)).sum().item()

test_loss /= len(test_loader.dataset)

print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
test_loss, correct, len(test_loader.dataset),
100. * correct / len(test_loader.dataset)))


@distributed
def train_func(
model, train_dataloader, validation_dataloader, device,
optimizer, scheduler, epochs=10
):
for epoch in range(1, epochs + 1):
train(model, device, train_dataloader, optimizer, epoch)
test(model, device, validation_dataloader)
scheduler.step()


if __name__ == '__main__':

train_set = datasets.MNIST(
'.tmp/', train=True, download=True,
transform=transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
]))
val_set = datasets.MNIST(
'.tmp/', train=False, download=True,
transform=transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
]))
model = Net()
train_dataloader = DataLoader(train_set, batch_size=32, pin_memory=True)
validation_dataloader = DataLoader(val_set, batch_size=32, pin_memory=True)
optimizer = optim.Adadelta(model.parameters(), lr=1e-3)
scheduler = StepLR(optimizer, step_size=1, gamma=0.9)

# Train distributed
train_func(model, train_dataloader, validation_dataloader, 'cuda',
optimizer, scheduler=scheduler)
61 changes: 61 additions & 0 deletions ai/src/itwinai/backend/torch/test_trainer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
"""
Test Trainer class. To run this script, use the following command:
>>> torchrun --nnodes=1 --nproc_per_node=2 --rdzv_id=100 --rdzv_backend=c10d \
--rdzv_endpoint=localhost:29400 test_trainer.py
"""

from torch import nn
import torch.nn.functional as F
from torch.utils.data import DataLoader
from torchvision import transforms, datasets

from itwinai.backend.torch.trainer import TorchTrainer


class Net(nn.Module):

def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
self.conv2_drop = nn.Dropout2d()
self.fc1 = nn.Linear(320, 50)
self.fc2 = nn.Linear(50, 10)

def forward(self, x):
x = F.relu(F.max_pool2d(self.conv1(x), 2))
x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
x = x.view(-1, 320)
x = F.relu(self.fc1(x))
x = F.dropout(x, training=self.training)
x = self.fc2(x)
return F.log_softmax(x, dim=0)


if __name__ == '__main__':
train_set = datasets.MNIST(
'.tmp/', train=True, download=True,
transform=transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
]))
val_set = datasets.MNIST(
'.tmp/', train=False, download=True,
transform=transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
]))
trainer = TorchTrainer(
model=Net(),
train_dataloader=DataLoader(train_set, batch_size=32, pin_memory=True),
validation_dataloader=DataLoader(
val_set, batch_size=32, pin_memory=True),
strategy='ddp',
backend='nccl',
loss='NLLLoss',
epochs=20,
checkpoint_every=1
)
trainer.train()
Loading

0 comments on commit 57b415b

Please sign in to comment.