Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Speed up CI tests :) #3727

Merged
merged 58 commits into from
Dec 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
25898a8
smaller stochastic depth
Nov 29, 2024
7f5f3cd
pre-commit
Nov 29, 2024
eafe347
lessen batches, fix block
Nov 29, 2024
dd39ec3
epochs to batches, also revert trying to make custom resnet
Nov 29, 2024
8d1bd92
test
Nov 29, 2024
a386263
nlp test reduction
Nov 30, 2024
cff11cf
save copy
Nov 30, 2024
8e84544
dataset
Nov 30, 2024
0d70004
smaller models
Nov 30, 2024
0952cc6
precommit
Nov 30, 2024
5b0f0a0
make model bigger
Nov 30, 2024
a46bcc5
rm gated
Nov 30, 2024
53da8a8
layerfreezing
Nov 30, 2024
d87a687
new algorithm
Nov 30, 2024
eb7ffd4
precommit
Nov 30, 2024
f150c14
precommit
Nov 30, 2024
8862b54
precommit
Nov 30, 2024
86373d1
precommit
Nov 30, 2024
57e6b27
precommit
Nov 30, 2024
b20e7fd
algo
Nov 30, 2024
e2f9cc5
algo
Nov 30, 2024
568596b
algo
Nov 30, 2024
7772f96
algo
Nov 30, 2024
3cfcb4c
algo
Nov 30, 2024
0736c58
smaller
Nov 30, 2024
f2317ce
smol vocab size
Nov 30, 2024
f1fbfbf
smol vocab size
Nov 30, 2024
4b5c590
tiniest model
Nov 30, 2024
3277cba
tiniest model
Nov 30, 2024
e638994
revert
Nov 30, 2024
e5170d4
speedy resume
Nov 30, 2024
e3b34da
fix pathing
Nov 30, 2024
324224e
bf16
Nov 30, 2024
57b9110
faster precision
Nov 30, 2024
0b13105
1 batch
Nov 30, 2024
3fd229a
epocj
Nov 30, 2024
d43f6a0
smaller gradient clipping
Nov 30, 2024
4ca7bbb
speed up fsdp
Nov 30, 2024
17cbd20
reduce batchsize
Nov 30, 2024
3c826e6
typo
Nov 30, 2024
62f0bea
revert
Nov 30, 2024
69f13ba
update num epochs
Nov 30, 2024
7b69fc3
one epoch only
Nov 30, 2024
868a71b
revery
Nov 30, 2024
ceb6591
precommit
Dec 1, 2024
6785988
clean
Dec 1, 2024
ce2362b
clean
Dec 1, 2024
a4afdc0
clean
Dec 1, 2024
e595c52
mock
Dec 1, 2024
b72ad75
precommit
Dec 1, 2024
477b597
precommit
Dec 1, 2024
b527187
precommit
Dec 1, 2024
12955d1
mock and reuse
Dec 1, 2024
e0a87af
mock and reuse
Dec 1, 2024
8910f25
mock and reuse
Dec 1, 2024
234b55f
mock and reuse
Dec 1, 2024
a255e90
revert
Dec 1, 2024
91bbd25
revert
Dec 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 92 additions & 84 deletions tests/algorithms/test_algorithm_resumption.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,94 +28,102 @@ def test_algorithm_resumption(
alg_cls: type[Algorithm],
world_size,
):
folder1 = os.path.join(tmp_path, 'folder1')
folder2 = os.path.join(tmp_path, 'folder2')
os.makedirs(folder1, exist_ok=True)
os.makedirs(folder2, exist_ok=True)

model = get_alg_model(alg_cls)
alg_kwargs = get_alg_kwargs(alg_cls)

copied_model = copy.deepcopy(model) # copy the model so the params will start from the same point

if alg_cls is LayerFreezing:
pytest.xfail('Known issues')

if alg_cls in (SAM, StochasticDepth):
pytest.xfail('Mismatch in weights when resuming from a checkpoint.')

if alg_cls is GyroDropout:
pytest.xfail('GyroDropoutLayer is not implemented in a way that allows correct resumption.')

if alg_cls is SWA and world_size > 1:
pytest.xfail('SWA is not implemented in a way that is compatible correct resumption on multiple devices.')

optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=5)

shared_config = {
'max_duration': '2ep',
'save_filename': 'ep{epoch}-rank{rank}',
'save_interval': '1ep',
'train_subset_num_batches': 2,
'precision': 'amp_bf16',
}
train_dataloader = get_alg_dataloader(alg_cls) if world_size == 1 else get_alg_dataloader(alg_cls, multigpu=True)
# train model once, saving checkpoints every epoch
trainer1 = Trainer(
model=model,
train_dataloader=train_dataloader,
optimizers=optimizer,
schedulers=scheduler,
save_folder=folder1,
algorithms=alg_cls(**alg_kwargs),
**shared_config,
)
trainer1.fit()

# create second trainer, load an intermediate checkpoint
# and continue training

optimizer = torch.optim.Adam(copied_model.parameters(), lr=0.01)
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=5)

alg = alg_cls(**alg_kwargs)
# SeqLengthWarmup has a call to ._activate_model() that happens on the first call to the algorithm
# in order to get complete matching of the rng state, we have to cause that extra call to be skipped
# when reloading.
if alg_cls is SeqLengthWarmup:
alg._activated = True # type: ignore

train_dataloader = get_alg_dataloader(alg_cls) if world_size == 1 else get_alg_dataloader(alg_cls, multigpu=True)
trainer2 = Trainer(
model=copied_model,
train_dataloader=train_dataloader,
load_path=os.path.join(folder1, 'ep1-rank{rank}'),
load_weights_only=False,
load_strict_model_weights=False,
optimizers=optimizer,
schedulers=scheduler,
save_folder=folder2,
algorithms=alg,
**shared_config,
)
trainer2.fit()
# check that the checkpoints are equal
if world_size == 1 or dist.get_global_rank() == 0:
_assert_checkpoints_equal(
file1=os.path.join(folder1, 'ep2-rank0'),
file2=os.path.join(folder2, 'ep2-rank0'),
# Use RAM-based tmp directory instead of disk
from tempfile import TemporaryDirectory
with TemporaryDirectory() as tmpdir:
folder1 = os.path.join(tmpdir, 'folder1')
folder2 = os.path.join(tmpdir, 'folder2')
os.makedirs(folder1, exist_ok=True)
os.makedirs(folder2, exist_ok=True)

if alg_cls is LayerFreezing:
pytest.xfail('Known issues')

if alg_cls in (SAM, StochasticDepth):
pytest.xfail('Mismatch in weights when resuming from a checkpoint.')

if alg_cls is GyroDropout:
pytest.xfail('GyroDropoutLayer is not implemented in a way that allows correct resumption.')

if alg_cls is SWA and world_size > 1:
pytest.xfail('SWA is not implemented in a way that is compatible correct resumption on multiple devices.')

model = get_alg_model(alg_cls)
alg_kwargs = get_alg_kwargs(alg_cls)

copied_model = copy.deepcopy(model) # copy the model so the params will start from the same point

optimizer = torch.optim.SGD(model.parameters(), lr=0.1)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=1)

# Reduce training duration and data
shared_config = {
'max_duration': '2ba',
'save_filename': 'checkpoint_ba{batch}-rank{rank}',
'save_interval': '1ba',
'train_subset_num_batches': 2,
'precision': 'amp_bf16',
}
train_dataloader = get_alg_dataloader(
alg_cls,
) if world_size == 1 else get_alg_dataloader(alg_cls, multigpu=True)
# train model once, saving checkpoints every epoch
trainer1 = Trainer(
model=model,
train_dataloader=train_dataloader,
optimizers=optimizer,
schedulers=scheduler,
save_folder=folder1,
algorithms=alg_cls(**alg_kwargs),
**shared_config,
)
trainer1.fit()

# create second trainer, load an intermediate checkpoint
# and continue training

optimizer = torch.optim.SGD(copied_model.parameters(), lr=0.1)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=1)

alg = alg_cls(**alg_kwargs)
# SeqLengthWarmup has a call to ._activate_model() that happens on the first call to the algorithm
# in order to get complete matching of the rng state, we have to cause that extra call to be skipped
# when reloading.
if alg_cls is SeqLengthWarmup:
alg._activated = True # type: ignore
train_dataloader = get_alg_dataloader(
alg_cls,
) if world_size == 1 else get_alg_dataloader(alg_cls, multigpu=True)

trainer2 = Trainer(
model=copied_model,
train_dataloader=train_dataloader,
load_path=os.path.join(folder1, 'checkpoint_ba1-rank{rank}'),
load_weights_only=False,
load_strict_model_weights=False,
optimizers=optimizer,
schedulers=scheduler,
save_folder=folder2,
algorithms=alg,
**shared_config,
)
trainer2.fit()

# check that different epoch checkpoints are _not_ equal
# this ensures that the model weights are being updated.
if world_size == 1 or dist.get_global_rank() == 0:
with pytest.raises(AssertionError):
_assert_model_weights_equal(
file1=os.path.join(folder1, 'ep1-rank0'),
file2=os.path.join(folder1, 'ep2-rank0'),
# check that the checkpoints are equal
if world_size == 1 or dist.get_global_rank() == 0:
_assert_checkpoints_equal(
os.path.join(folder1, 'checkpoint_ba2-rank0'),
os.path.join(folder2, 'checkpoint_ba2-rank0'),
)

# check that different epoch checkpoints are _not_ equal
# this ensures that the model weights are being updated.
with pytest.raises(AssertionError):
_assert_model_weights_equal(
os.path.join(folder1, 'checkpoint_ba1-rank0'),
os.path.join(folder1, 'checkpoint_ba2-rank0'),
)


def _assert_checkpoints_equal(file1, file2):
# TODO: consider merging with _assert_checkpoints_equivalent
Expand Down
6 changes: 3 additions & 3 deletions tests/algorithms/test_algorithms_train.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def test_algorithm_trains(alg_cls: type[Algorithm]):
trainer = Trainer(
model=model,
train_dataloader=dataloader,
max_duration='2ep',
max_duration='2ba',
algorithms=alg_cls(**alg_kwargs),
)
trainer.fit()
Expand All @@ -34,5 +34,5 @@ def test_algorithm_trains(alg_cls: type[Algorithm]):
'GyroDropout is implemented to be applied on Event.FIT_START, so is not compatible with multiple calls to fit.',
)

# fit again for another epoch
trainer.fit(duration='1ep')
# fit again for another batch
trainer.fit(duration='1ba')
8 changes: 3 additions & 5 deletions tests/algorithms/test_gradient_clipping.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
def simple_model_with_grads():
# Set up small NN with one linear layer with no bias + softmax, so only
# one set of params and get some gradients.
N, hin, num_classes = 8, 4, 3
N, hin, num_classes = 4, 2, 2
x = torch.rand((N, hin))
y = torch.randint(high=num_classes - 1, size=(N,))
model = nn.Sequential(nn.Linear(hin, num_classes, bias=False), nn.Softmax(dim=1))
Expand All @@ -47,8 +47,6 @@ def __init__(self, n_ch, num_fmaps, h, num_classes, filter_size):
self.mlp = nn.Sequential(
nn.Linear(num_fmaps, h),
nn.ReLU(),
nn.Linear(h, h),
nn.ReLU(),
nn.Linear(h, num_classes),
nn.Softmax(dim=1),
)
Expand All @@ -60,8 +58,8 @@ def forward(self, x):
return out

# Generate some gradients.
N, n_ch, num_fmaps, h, num_classes, filter_size = 8, 3, 4, 4, 3, 3
x = torch.rand((N, n_ch, 16, 16))
N, n_ch, num_fmaps, h, num_classes, filter_size = 4, 1, 2, 2, 2, 2
x = torch.rand((N, n_ch, 8, 8))
y = torch.randint(high=num_classes - 1, size=(N,))
model = myNN(n_ch, num_fmaps, h, num_classes, filter_size)

Expand Down
62 changes: 51 additions & 11 deletions tests/test_events.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
# Copyright 2022 MosaicML Composer authors
# Copyright 2024 MosaicML Composer authors
# SPDX-License-Identifier: Apache-2.0

import math
from unittest.mock import patch

import pytest
import torch
Expand All @@ -22,27 +23,31 @@ def test_event_values(event: Event):

class TestEventCalls:

eval_subset_num_batches = 2
train_subset_num_batches = 2
eval_subset_num_batches = 1
train_subset_num_batches = 1

def get_trainer(self, precision='fp32', **kwargs):
def get_trainer(self, precision='fp32', max_duration='1ep', save_interval='1ep', **kwargs):
model = SimpleModel()
optimizer = torch.optim.Adam(model.parameters())

train_dataset = RandomClassificationDataset()
eval_dataset = RandomClassificationDataset()
train_dataset = RandomClassificationDataset(size=16)
eval_dataset = RandomClassificationDataset(size=16)
train_batch_size = 4

evaluator1 = DataLoader(
dataset=eval_dataset,
batch_size=8,
sampler=dist.get_sampler(eval_dataset),
num_workers=0,
drop_last=True,
)

evaluator2 = DataLoader(
dataset=eval_dataset,
batch_size=4,
sampler=dist.get_sampler(eval_dataset),
num_workers=0,
drop_last=True,
)

return Trainer(
Expand All @@ -51,13 +56,15 @@ def get_trainer(self, precision='fp32', **kwargs):
dataset=train_dataset,
batch_size=train_batch_size,
sampler=dist.get_sampler(train_dataset),
num_workers=0,
),
eval_dataloader=(evaluator1, evaluator2),
device_train_microbatch_size=train_batch_size // 2,
precision=precision,
train_subset_num_batches=self.train_subset_num_batches,
eval_subset_num_batches=self.eval_subset_num_batches,
max_duration='2ep',
max_duration=max_duration,
save_interval=save_interval,
optimizers=optimizer,
callbacks=[EventCounterCallback()],
**kwargs,
Expand Down Expand Up @@ -101,8 +108,41 @@ def get_trainer(self, precision='fp32', **kwargs):
)
@pytest.mark.parametrize('save_interval', ['1ep', '1ba'])
def test_event_calls(self, world_size, device, deepspeed_zero_stage, use_fsdp, precision, save_interval):
save_interval = Time.from_timestring(save_interval)

# handle 1ba save interval separately to optimize speed
if save_interval == '1ba':
# mock the save_checkpoint method to speed up batch saves
with patch('composer.trainer.trainer.Trainer.save_checkpoint') as mock_save:
mock_save.return_value = None
self._run_event_calls_test(
world_size,
device,
deepspeed_zero_stage,
use_fsdp,
precision,
save_interval,
num_epochs=1,
)
else:
self._run_event_calls_test(
world_size,
device,
deepspeed_zero_stage,
use_fsdp,
precision,
save_interval,
num_epochs=1,
)

def _run_event_calls_test(
self,
world_size,
device,
deepspeed_zero_stage,
use_fsdp,
precision,
save_interval,
num_epochs,
):
deepspeed_config = None
if deepspeed_zero_stage:
deepspeed_config = {'zero_optimization': {'stage': deepspeed_zero_stage}}
Expand All @@ -123,11 +163,11 @@ def test_event_calls(self, world_size, device, deepspeed_zero_stage, use_fsdp, p
deepspeed_config=deepspeed_config,
parallelism_config=parallelism_config,
save_interval=save_interval,
eval_interval=save_interval,
eval_interval=Time.from_timestring(save_interval),
)
trainer.fit()

self._assert_expected_event_calls(trainer, save_interval, num_epochs=2)
self._assert_expected_event_calls(trainer, Time.from_timestring(save_interval), num_epochs=num_epochs)

def _assert_expected_event_calls(self, trainer: Trainer, eval_interval: Time, num_epochs: int):
state = trainer.state
Expand Down
Loading