diff --git a/tests/algorithms/test_algorithm_resumption.py b/tests/algorithms/test_algorithm_resumption.py index e0b14ec198..6d6145cf62 100644 --- a/tests/algorithms/test_algorithm_resumption.py +++ b/tests/algorithms/test_algorithm_resumption.py @@ -28,94 +28,102 @@ def test_algorithm_resumption( alg_cls: type[Algorithm], world_size, ): - folder1 = os.path.join(tmp_path, 'folder1') - folder2 = os.path.join(tmp_path, 'folder2') - os.makedirs(folder1, exist_ok=True) - os.makedirs(folder2, exist_ok=True) - - model = get_alg_model(alg_cls) - alg_kwargs = get_alg_kwargs(alg_cls) - - copied_model = copy.deepcopy(model) # copy the model so the params will start from the same point - - if alg_cls is LayerFreezing: - pytest.xfail('Known issues') - - if alg_cls in (SAM, StochasticDepth): - pytest.xfail('Mismatch in weights when resuming from a checkpoint.') - - if alg_cls is GyroDropout: - pytest.xfail('GyroDropoutLayer is not implemented in a way that allows correct resumption.') - - if alg_cls is SWA and world_size > 1: - pytest.xfail('SWA is not implemented in a way that is compatible correct resumption on multiple devices.') - - optimizer = torch.optim.Adam(model.parameters(), lr=0.01) - scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=5) - - shared_config = { - 'max_duration': '2ep', - 'save_filename': 'ep{epoch}-rank{rank}', - 'save_interval': '1ep', - 'train_subset_num_batches': 2, - 'precision': 'amp_bf16', - } - train_dataloader = get_alg_dataloader(alg_cls) if world_size == 1 else get_alg_dataloader(alg_cls, multigpu=True) - # train model once, saving checkpoints every epoch - trainer1 = Trainer( - model=model, - train_dataloader=train_dataloader, - optimizers=optimizer, - schedulers=scheduler, - save_folder=folder1, - algorithms=alg_cls(**alg_kwargs), - **shared_config, - ) - trainer1.fit() - - # create second trainer, load an intermediate checkpoint - # and continue training - - optimizer = torch.optim.Adam(copied_model.parameters(), lr=0.01) - scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=5) - - alg = alg_cls(**alg_kwargs) - # SeqLengthWarmup has a call to ._activate_model() that happens on the first call to the algorithm - # in order to get complete matching of the rng state, we have to cause that extra call to be skipped - # when reloading. - if alg_cls is SeqLengthWarmup: - alg._activated = True # type: ignore - - train_dataloader = get_alg_dataloader(alg_cls) if world_size == 1 else get_alg_dataloader(alg_cls, multigpu=True) - trainer2 = Trainer( - model=copied_model, - train_dataloader=train_dataloader, - load_path=os.path.join(folder1, 'ep1-rank{rank}'), - load_weights_only=False, - load_strict_model_weights=False, - optimizers=optimizer, - schedulers=scheduler, - save_folder=folder2, - algorithms=alg, - **shared_config, - ) - trainer2.fit() - # check that the checkpoints are equal - if world_size == 1 or dist.get_global_rank() == 0: - _assert_checkpoints_equal( - file1=os.path.join(folder1, 'ep2-rank0'), - file2=os.path.join(folder2, 'ep2-rank0'), + # Use RAM-based tmp directory instead of disk + from tempfile import TemporaryDirectory + with TemporaryDirectory() as tmpdir: + folder1 = os.path.join(tmpdir, 'folder1') + folder2 = os.path.join(tmpdir, 'folder2') + os.makedirs(folder1, exist_ok=True) + os.makedirs(folder2, exist_ok=True) + + if alg_cls is LayerFreezing: + pytest.xfail('Known issues') + + if alg_cls in (SAM, StochasticDepth): + pytest.xfail('Mismatch in weights when resuming from a checkpoint.') + + if alg_cls is GyroDropout: + pytest.xfail('GyroDropoutLayer is not implemented in a way that allows correct resumption.') + + if alg_cls is SWA and world_size > 1: + pytest.xfail('SWA is not implemented in a way that is compatible correct resumption on multiple devices.') + + model = get_alg_model(alg_cls) + alg_kwargs = get_alg_kwargs(alg_cls) + + copied_model = copy.deepcopy(model) # copy the model so the params will start from the same point + + optimizer = torch.optim.SGD(model.parameters(), lr=0.1) + scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=1) + + # Reduce training duration and data + shared_config = { + 'max_duration': '2ba', + 'save_filename': 'checkpoint_ba{batch}-rank{rank}', + 'save_interval': '1ba', + 'train_subset_num_batches': 2, + 'precision': 'amp_bf16', + } + train_dataloader = get_alg_dataloader( + alg_cls, + ) if world_size == 1 else get_alg_dataloader(alg_cls, multigpu=True) + # train model once, saving checkpoints every epoch + trainer1 = Trainer( + model=model, + train_dataloader=train_dataloader, + optimizers=optimizer, + schedulers=scheduler, + save_folder=folder1, + algorithms=alg_cls(**alg_kwargs), + **shared_config, ) + trainer1.fit() + + # create second trainer, load an intermediate checkpoint + # and continue training + + optimizer = torch.optim.SGD(copied_model.parameters(), lr=0.1) + scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=1) + + alg = alg_cls(**alg_kwargs) + # SeqLengthWarmup has a call to ._activate_model() that happens on the first call to the algorithm + # in order to get complete matching of the rng state, we have to cause that extra call to be skipped + # when reloading. + if alg_cls is SeqLengthWarmup: + alg._activated = True # type: ignore + train_dataloader = get_alg_dataloader( + alg_cls, + ) if world_size == 1 else get_alg_dataloader(alg_cls, multigpu=True) + + trainer2 = Trainer( + model=copied_model, + train_dataloader=train_dataloader, + load_path=os.path.join(folder1, 'checkpoint_ba1-rank{rank}'), + load_weights_only=False, + load_strict_model_weights=False, + optimizers=optimizer, + schedulers=scheduler, + save_folder=folder2, + algorithms=alg, + **shared_config, + ) + trainer2.fit() - # check that different epoch checkpoints are _not_ equal - # this ensures that the model weights are being updated. - if world_size == 1 or dist.get_global_rank() == 0: - with pytest.raises(AssertionError): - _assert_model_weights_equal( - file1=os.path.join(folder1, 'ep1-rank0'), - file2=os.path.join(folder1, 'ep2-rank0'), + # check that the checkpoints are equal + if world_size == 1 or dist.get_global_rank() == 0: + _assert_checkpoints_equal( + os.path.join(folder1, 'checkpoint_ba2-rank0'), + os.path.join(folder2, 'checkpoint_ba2-rank0'), ) + # check that different epoch checkpoints are _not_ equal + # this ensures that the model weights are being updated. + with pytest.raises(AssertionError): + _assert_model_weights_equal( + os.path.join(folder1, 'checkpoint_ba1-rank0'), + os.path.join(folder1, 'checkpoint_ba2-rank0'), + ) + def _assert_checkpoints_equal(file1, file2): # TODO: consider merging with _assert_checkpoints_equivalent diff --git a/tests/algorithms/test_algorithms_train.py b/tests/algorithms/test_algorithms_train.py index a73a649b70..0f9cb32de2 100644 --- a/tests/algorithms/test_algorithms_train.py +++ b/tests/algorithms/test_algorithms_train.py @@ -18,7 +18,7 @@ def test_algorithm_trains(alg_cls: type[Algorithm]): trainer = Trainer( model=model, train_dataloader=dataloader, - max_duration='2ep', + max_duration='2ba', algorithms=alg_cls(**alg_kwargs), ) trainer.fit() @@ -34,5 +34,5 @@ def test_algorithm_trains(alg_cls: type[Algorithm]): 'GyroDropout is implemented to be applied on Event.FIT_START, so is not compatible with multiple calls to fit.', ) - # fit again for another epoch - trainer.fit(duration='1ep') + # fit again for another batch + trainer.fit(duration='1ba') diff --git a/tests/algorithms/test_gradient_clipping.py b/tests/algorithms/test_gradient_clipping.py index d749362801..eb95c1359a 100644 --- a/tests/algorithms/test_gradient_clipping.py +++ b/tests/algorithms/test_gradient_clipping.py @@ -20,7 +20,7 @@ def simple_model_with_grads(): # Set up small NN with one linear layer with no bias + softmax, so only # one set of params and get some gradients. - N, hin, num_classes = 8, 4, 3 + N, hin, num_classes = 4, 2, 2 x = torch.rand((N, hin)) y = torch.randint(high=num_classes - 1, size=(N,)) model = nn.Sequential(nn.Linear(hin, num_classes, bias=False), nn.Softmax(dim=1)) @@ -47,8 +47,6 @@ def __init__(self, n_ch, num_fmaps, h, num_classes, filter_size): self.mlp = nn.Sequential( nn.Linear(num_fmaps, h), nn.ReLU(), - nn.Linear(h, h), - nn.ReLU(), nn.Linear(h, num_classes), nn.Softmax(dim=1), ) @@ -60,8 +58,8 @@ def forward(self, x): return out # Generate some gradients. - N, n_ch, num_fmaps, h, num_classes, filter_size = 8, 3, 4, 4, 3, 3 - x = torch.rand((N, n_ch, 16, 16)) + N, n_ch, num_fmaps, h, num_classes, filter_size = 4, 1, 2, 2, 2, 2 + x = torch.rand((N, n_ch, 8, 8)) y = torch.randint(high=num_classes - 1, size=(N,)) model = myNN(n_ch, num_fmaps, h, num_classes, filter_size) diff --git a/tests/test_events.py b/tests/test_events.py index 235d0941f1..fe7dd71141 100644 --- a/tests/test_events.py +++ b/tests/test_events.py @@ -1,7 +1,8 @@ -# Copyright 2022 MosaicML Composer authors +# Copyright 2024 MosaicML Composer authors # SPDX-License-Identifier: Apache-2.0 import math +from unittest.mock import patch import pytest import torch @@ -22,27 +23,31 @@ def test_event_values(event: Event): class TestEventCalls: - eval_subset_num_batches = 2 - train_subset_num_batches = 2 + eval_subset_num_batches = 1 + train_subset_num_batches = 1 - def get_trainer(self, precision='fp32', **kwargs): + def get_trainer(self, precision='fp32', max_duration='1ep', save_interval='1ep', **kwargs): model = SimpleModel() optimizer = torch.optim.Adam(model.parameters()) - train_dataset = RandomClassificationDataset() - eval_dataset = RandomClassificationDataset() + train_dataset = RandomClassificationDataset(size=16) + eval_dataset = RandomClassificationDataset(size=16) train_batch_size = 4 evaluator1 = DataLoader( dataset=eval_dataset, batch_size=8, sampler=dist.get_sampler(eval_dataset), + num_workers=0, + drop_last=True, ) evaluator2 = DataLoader( dataset=eval_dataset, batch_size=4, sampler=dist.get_sampler(eval_dataset), + num_workers=0, + drop_last=True, ) return Trainer( @@ -51,13 +56,15 @@ def get_trainer(self, precision='fp32', **kwargs): dataset=train_dataset, batch_size=train_batch_size, sampler=dist.get_sampler(train_dataset), + num_workers=0, ), eval_dataloader=(evaluator1, evaluator2), device_train_microbatch_size=train_batch_size // 2, precision=precision, train_subset_num_batches=self.train_subset_num_batches, eval_subset_num_batches=self.eval_subset_num_batches, - max_duration='2ep', + max_duration=max_duration, + save_interval=save_interval, optimizers=optimizer, callbacks=[EventCounterCallback()], **kwargs, @@ -101,8 +108,41 @@ def get_trainer(self, precision='fp32', **kwargs): ) @pytest.mark.parametrize('save_interval', ['1ep', '1ba']) def test_event_calls(self, world_size, device, deepspeed_zero_stage, use_fsdp, precision, save_interval): - save_interval = Time.from_timestring(save_interval) - + # handle 1ba save interval separately to optimize speed + if save_interval == '1ba': + # mock the save_checkpoint method to speed up batch saves + with patch('composer.trainer.trainer.Trainer.save_checkpoint') as mock_save: + mock_save.return_value = None + self._run_event_calls_test( + world_size, + device, + deepspeed_zero_stage, + use_fsdp, + precision, + save_interval, + num_epochs=1, + ) + else: + self._run_event_calls_test( + world_size, + device, + deepspeed_zero_stage, + use_fsdp, + precision, + save_interval, + num_epochs=1, + ) + + def _run_event_calls_test( + self, + world_size, + device, + deepspeed_zero_stage, + use_fsdp, + precision, + save_interval, + num_epochs, + ): deepspeed_config = None if deepspeed_zero_stage: deepspeed_config = {'zero_optimization': {'stage': deepspeed_zero_stage}} @@ -123,11 +163,11 @@ def test_event_calls(self, world_size, device, deepspeed_zero_stage, use_fsdp, p deepspeed_config=deepspeed_config, parallelism_config=parallelism_config, save_interval=save_interval, - eval_interval=save_interval, + eval_interval=Time.from_timestring(save_interval), ) trainer.fit() - self._assert_expected_event_calls(trainer, save_interval, num_epochs=2) + self._assert_expected_event_calls(trainer, Time.from_timestring(save_interval), num_epochs=num_epochs) def _assert_expected_event_calls(self, trainer: Trainer, eval_interval: Time, num_epochs: int): state = trainer.state diff --git a/tests/test_full_nlp.py b/tests/test_full_nlp.py index 14380b38fe..a3f342500b 100644 --- a/tests/test_full_nlp.py +++ b/tests/test_full_nlp.py @@ -9,6 +9,7 @@ from packaging import version from torch.utils.data import DataLoader from torchmetrics.classification import MulticlassAccuracy +from transformers import BertConfig, BertForMaskedLM, BertForSequenceClassification, BertTokenizerFast from composer.algorithms import GatedLinearUnits from composer.loggers import RemoteUploaderDownloader @@ -35,22 +36,22 @@ def pretraining_test_helper(tokenizer, model, algorithms, tmp_path, device): pretraining_model_copy = copy.deepcopy(model) pretraining_train_dataset = RandomTextLMDataset( - size=8, + size=4, vocab_size=tokenizer.vocab_size, - sequence_length=4, + sequence_length=2, use_keys=True, ) collator = transformers.DataCollatorForLanguageModeling(tokenizer=tokenizer, mlm_probability=0.15) pretraining_train_dataloader = DataLoader( pretraining_train_dataset, - batch_size=4, + batch_size=2, sampler=dist.get_sampler(pretraining_train_dataset), collate_fn=collator, ) pretraining_eval_dataloader = DataLoader( pretraining_train_dataset, - batch_size=4, + batch_size=2, sampler=dist.get_sampler(pretraining_train_dataset), collate_fn=collator, ) @@ -59,7 +60,7 @@ def pretraining_test_helper(tokenizer, model, algorithms, tmp_path, device): model=pretraining_model_copy, train_dataloader=pretraining_train_dataloader, save_folder=str(tmp_path / 'pretraining_checkpoints'), - max_duration='1ep', + max_duration='1ba', seed=17, algorithms=algorithms, device=device, @@ -91,20 +92,20 @@ def finetuning_test_helper(tokenizer, model, algorithms, checkpoint_path, pretra finetuning_model_copy = copy.deepcopy(model) finetuning_train_dataset = RandomTextClassificationDataset( - size=8, + size=4, vocab_size=tokenizer.vocab_size, - sequence_length=4, + sequence_length=2, num_classes=3, use_keys=isinstance(model, HuggingFaceModel), ) finetuning_train_dataloader = DataLoader( finetuning_train_dataset, - batch_size=4, + batch_size=2, sampler=dist.get_sampler(finetuning_train_dataset), ) finetuning_eval_dataloader = DataLoader( finetuning_train_dataset, - batch_size=4, + batch_size=2, sampler=dist.get_sampler(finetuning_train_dataset), ) @@ -137,7 +138,7 @@ def finetuning_test_helper(tokenizer, model, algorithms, checkpoint_path, pretra load_weights_only=True, load_strict_model_weights=False, loggers=[rud], - max_duration='1ep', + max_duration='1ba', seed=17, algorithms=algorithms, device=device, @@ -229,7 +230,6 @@ def inference_test_helper( @device('cpu', 'gpu') -# Note: the specificity of these settings are due to incompatibilities (e.g. the simpletransformer model is not traceable) @pytest.mark.parametrize( 'model_type,algorithms,save_format', [ @@ -242,10 +242,8 @@ def test_full_nlp_pipeline( model_type, algorithms, save_format, - tiny_bert_tokenizer, onnx_opset_version, tmp_path, - request, device, ): """This test is intended to exercise our full pipeline for NLP. @@ -260,29 +258,33 @@ def test_full_nlp_pipeline( pytest.skip("Don't test prior PyTorch version's default Opset version.") algorithms = [algorithm() for algorithm in algorithms] - device = get_device(device) - - tiny_bert_model = None - if model_type == 'tinybert_hf': - tiny_bert_model = request.getfixturevalue('tiny_bert_model') - - # pretraining + config = None + tokenizer = BertTokenizerFast.from_pretrained('bert-base-uncased', model_max_length=128) if model_type == 'tinybert_hf': - assert tiny_bert_model is not None + # Updated minimal BERT configuration + config = BertConfig( + vocab_size=30522, + hidden_size=16, + num_hidden_layers=2, + num_attention_heads=2, + intermediate_size=64, + num_labels=3, + ) + tiny_bert_model = BertForMaskedLM(config) pretraining_metrics = [LanguageCrossEntropy(ignore_index=-100), MaskedAccuracy(ignore_index=-100)] pretraining_model = HuggingFaceModel( tiny_bert_model, - tiny_bert_tokenizer, + tokenizer, use_logits=True, metrics=pretraining_metrics, ) elif model_type == 'simpletransformer': - pretraining_model = SimpleTransformerMaskedLM(vocab_size=tiny_bert_tokenizer.vocab_size) + pretraining_model = SimpleTransformerMaskedLM(vocab_size=30522) else: raise ValueError('Unsupported model type') pretraining_output_path = pretraining_test_helper( - tiny_bert_tokenizer, + tokenizer, pretraining_model, algorithms, tmp_path, @@ -292,25 +294,23 @@ def test_full_nlp_pipeline( # finetuning if model_type == 'tinybert_hf': finetuning_metric = MulticlassAccuracy(num_classes=3, average='micro') - hf_finetuning_model, _ = HuggingFaceModel.hf_from_composer_checkpoint( - pretraining_output_path, - model_instantiation_class='transformers.AutoModelForSequenceClassification', - model_config_kwargs={'num_labels': 3}, - ) finetuning_model = HuggingFaceModel( - model=hf_finetuning_model, - tokenizer=tiny_bert_tokenizer, + model=BertForSequenceClassification(config), + tokenizer=tokenizer, use_logits=True, metrics=[finetuning_metric], ) elif model_type == 'simpletransformer': - finetuning_model = SimpleTransformerClassifier(vocab_size=tiny_bert_tokenizer.vocab_size, num_classes=3) + finetuning_model = SimpleTransformerClassifier( + vocab_size=30522, + num_classes=3, + ) else: raise ValueError('Unsupported model type.') finetuning_model_copy = copy.deepcopy(finetuning_model) finetuning_trainer, finetuning_dataloader, rud, finetuning_output_path = finetuning_test_helper( - tiny_bert_tokenizer, + tokenizer, finetuning_model, algorithms, pretraining_output_path,