diff --git a/README.md b/README.md index c1fee8acf0..7d4144821a 100644 --- a/README.md +++ b/README.md @@ -69,7 +69,7 @@ The heart of Composer is our Trainer abstraction: a highly optimized PyTorch tra Whether you’re training on 1 GPU or 512 GPUs, 50MB or 10TB of data - Composer is built to keep your workflow simple. -- [**FSDP**](https://docs.mosaicml.com/projects/composer/en/stable/notes/distributed_training.html#fullyshardeddataparallel-fsdp): For large models that are too large to fit on GPUs, Composer has integrated PyTorch [FullyShardedDataParallelism](https://docs.mosaicml.com/projects/composer/en/stable/notes/distributed_training.html#fullyshardeddataparallel-fsdp) into our trainer and made it simple to efficiently parallelize custom models. We’ve found FSDP is competitive performance-wise with much more complex parallelism strategies. Alternatively, Composer also supports standard PyTorch distributed data parallelism (DDP) and Deepspeed execution. +- [**FSDP**](https://docs.mosaicml.com/projects/composer/en/stable/notes/distributed_training.html#fullyshardeddataparallel-fsdp): For large models that are too large to fit on GPUs, Composer has integrated PyTorch [FullyShardedDataParallelism](https://docs.mosaicml.com/projects/composer/en/stable/notes/distributed_training.html#fullyshardeddataparallel-fsdp) into our trainer and made it simple to efficiently parallelize custom models. We’ve found FSDP is competitive performance-wise with much more complex parallelism strategies. Alternatively, Composer also supports standard PyTorch distributed data parallelism (DDP) execution. - [**Elastic sharded checkpointing**](https://docs.mosaicml.com/projects/composer/en/stable/notes/distributed_training.html#saving-and-loading-sharded-checkpoints-with-fsdp): Save on eight GPUs, resume on sixteen. Composer supports elastic sharded checkpointing, so you never have to worry if your sharded saved state is compatible with your new hardware setup. - **Data streaming:** Working with large datasets? Download datasets from cloud blob storage on the fly by integrating with MosaicML [StreamingDataset](https://github.com/mosaicml/streaming) during model training. diff --git a/STYLE_GUIDE.md b/STYLE_GUIDE.md index 16cc94f5ba..1c046a2dc6 100644 --- a/STYLE_GUIDE.md +++ b/STYLE_GUIDE.md @@ -53,8 +53,8 @@ As a general rule of thumb, ```python from typing import Optional - def configure_deepspeed(deepspeed_config: Optional[dict]): - if deepspeed_config is None: + def configure_parallelism(parallelism_config: Optional[dict]): + if parallelism_config is None: # Don't do this check in the callee, which results in a no-op return ... @@ -67,13 +67,13 @@ As a general rule of thumb, ```python from typing import Optional - def configure_deepspeed(deepspeed_config: dict): + def configure_parallelism(parallelism_config: dict): ... - def trainer(deepspeed_config: Optional[dict]): - if deepspeed_config is not None: + def trainer(paralellism_config: Optional[dict]): + if paralellism_config is not None: # Do this check in the caller function - configure_deepspeed(deepspeed_config) + configure_paralellism(paralellism_config) ... ``` @@ -251,20 +251,7 @@ All imports in composer should be absolute -- that is, they do not begin with a an optional dependency is missing. If the corresponding package is not published on Anaconda, then set the ``conda_package`` to the pip package - name, and set ``conda_channel`` to ``None``. For example, with DeepSpeed: - - - ```python - from composer.utils import MissingConditionalImportError - - try: - import deepspeed - except ImportError as e: - raise MissingConditionalImportError(extra_deps_group="deepspeed", - conda_package="deepspeed>=0.5.5", - conda_channel=None) from e - ``` - + name, and set ``conda_channel`` to ``None``. 1. If the dependency is core to Composer, add the dependency to the `install_requires` section of diff --git a/composer/_version.py b/composer/_version.py index df0bb29480..a015337870 100644 --- a/composer/_version.py +++ b/composer/_version.py @@ -3,4 +3,4 @@ """The Composer Version.""" -__version__ = '0.28.0.dev0' +__version__ = '0.29.0.dev0' diff --git a/composer/algorithms/gradient_clipping/gradient_clipping.py b/composer/algorithms/gradient_clipping/gradient_clipping.py index 9968a28643..2a69307bf2 100644 --- a/composer/algorithms/gradient_clipping/gradient_clipping.py +++ b/composer/algorithms/gradient_clipping/gradient_clipping.py @@ -122,10 +122,6 @@ class GradientClipping(Algorithm): to (for 'value'), what values to clip the gradient norms to (for 'norm'), and threshold by which if grad_norm / weight_norm is greater than this threshold then scale gradients by this threshold * (weight_norm / grad_norm) (for 'adaptive'). - - Raises: - NotImplementedError: if deepspeed is enabled and clipping_type is not 'norm'. - ValueError: if deepspeed is enabled and clipping_type is not 'norm'. """ def __init__(self, clipping_type: str, clipping_threshold: float): @@ -136,20 +132,7 @@ def match(self, event: Event, state: State) -> bool: return event in [Event.INIT, Event.AFTER_TRAIN_BATCH] def apply(self, event: Event, state: State, logger: Logger) -> Optional[int]: - if event == Event.INIT and state.deepspeed_config is not None: - if self.clipping_type == 'norm': - if self.clipping_threshold > 0: - state.deepspeed_config['gradient_clipping'] = self.clipping_threshold - else: - raise ValueError( - f'Deepspeed only supports gradient clipping thresholds that are greater than zero, but the provided one is {self.clipping_threshold}', - ) - else: - raise NotImplementedError( - f"Deepspeed only supports gradient clipping of type 'norm' not of type '{self.clipping_type}'", - ) - - if event == Event.AFTER_TRAIN_BATCH and not state.deepspeed_enabled: + if event == Event.AFTER_TRAIN_BATCH: apply_gradient_clipping( model=state.model, clipping_type=self.clipping_type, diff --git a/composer/algorithms/low_precision_groupnorm/low_precision_groupnorm.py b/composer/algorithms/low_precision_groupnorm/low_precision_groupnorm.py index b140412c04..6b46df99dd 100644 --- a/composer/algorithms/low_precision_groupnorm/low_precision_groupnorm.py +++ b/composer/algorithms/low_precision_groupnorm/low_precision_groupnorm.py @@ -44,7 +44,7 @@ class LowPrecisionGroupNorm(Algorithm): LPGroupNorm is a thin wrapper around :class:`torch.nn.GroupNorm` which forces the layer to run in lower precision (torch.float16 or torch.bfloat16) if autocast is enabled. This algorithm has - no effect in FP32 or DeepSpeed FP16 mode, where autocast is disabled. + no effect in FP32, where autocast is disabled. This algorithm is intended to be used instead of Fused GroupNorm. They have similar behavior and performance. diff --git a/composer/algorithms/low_precision_layernorm/low_precision_layernorm.py b/composer/algorithms/low_precision_layernorm/low_precision_layernorm.py index fd3534aa65..1d33911743 100644 --- a/composer/algorithms/low_precision_layernorm/low_precision_layernorm.py +++ b/composer/algorithms/low_precision_layernorm/low_precision_layernorm.py @@ -44,7 +44,7 @@ class LowPrecisionLayerNorm(Algorithm): LPLayerNorm is a thin wrapper around :class:`torch.nn.LayerNorm` which forces the layer to run in lower precision (torch.float16 or torch.bfloat16) if autocast is enabled. This algorithm has - no effect in FP32 or DeepSpeed FP16 mode, where autocast is disabled. + no effect in FP32, where autocast is disabled. This algorithm is intended to be used instead of Fused LayerNorm. They have similar behavior and performance. diff --git a/composer/callbacks/checkpoint_saver.py b/composer/callbacks/checkpoint_saver.py index 26ea64090a..a4f94d541d 100644 --- a/composer/callbacks/checkpoint_saver.py +++ b/composer/callbacks/checkpoint_saver.py @@ -30,7 +30,6 @@ ensure_folder_has_no_conflicting_files, format_name_with_dist, format_name_with_dist_and_time, - is_model_deepspeed, parse_uri, partial_format, ) @@ -99,19 +98,12 @@ class CheckpointSaver(Callback): # noqa: D101 * By default, only the rank zero process will save a checkpoint file. - * When using DeepSpeed, each rank will save a checkpoint file in tarball format. DeepSpeed - requires tarball format, as it saves model and optimizer states in separate files. - Ensure that ``'{{rank}}'`` appears within the ``filename``. Otherwise, multiple ranks - may attempt to write to the same file(s), leading to corrupted checkpoints. If no tarball file - extension is specified, ``'.tar'`` will be used. - - * To write to compressed tar files (regardless of whether DeepSpeed is enabled), set the file + * To write to compressed tar files, set the file extension to ``'.tar.gz'``, ``'.tgz'``, ``'.tar.bz2'``, or ``'.tar.lzma'`` (depending on the desired compression algorithm). - * To write to compressed pt files (when DeepSpeed is disabled), set the file extension to - ``'.pt.bz2'``, ``'.pt.gz'``, ``'.pt.lz4'``, ``'.pt.lzma'``, ``'.pt.lzo'``, ``'.pt.xz'``, - ``'.pt.zst'`` + * To write to compressed pt files, set the file extension to ``'.pt.bz2'``, ``'.pt.gz'``, + ``'.pt.lz4'``, ``'.pt.lzma'``, ``'.pt.lzo'``, ``'.pt.xz'``, ``'.pt.zst'`` (depending on the desired algorithm). You must have the corresponding CLI tool installed. ``lz4`` is a good choice for a modest space saving while being very fast to compress. @@ -133,15 +125,7 @@ class CheckpointSaver(Callback): # noqa: D101 * The current epoch count is ``1``. * The current batch count is ``42``. - When DeepSpeed is not being used, the rank zero process will save the checkpoint to - ``"awesome-training-run/checkpoints/ep1-ba42-rank0"``. - - When DeepSpeed is being used, each rank (process) will save checkpoints to:: - - awesome-training-run/checkpoints/ep1-ba42-rank0.tar - awesome-training-run/checkpoints/ep1-ba42-rank1.tar - awesome-training-run/checkpoints/ep1-ba42-rank2.tar - ... + The rank zero process will save the checkpoint to ``"awesome-training-run/checkpoints/ep1-ba42-rank0"``. remote_file_name (str, optional): Format string for the checkpoint's remote file name. Default: ``"{{run_name}}/checkpoints/ep{{epoch}}-ba{{batch}}-rank{{rank}}"``. @@ -174,17 +158,9 @@ class CheckpointSaver(Callback): # noqa: D101 * The current epoch count is ``1``. * The current batch count is ``42``. - When DeepSpeed is not being used, the rank zero process will save the checkpoint to - ``'awesome-training-run/checkpoints/ep1-ba42-rank0'``, - and a symlink will be created at - ``'awesome-training-run/checkpoints/latest-rank0' -> 'awesome-training-run/checkpoints/ep1-ba42-rank0'`` - - When DeepSpeed is being used, each rank (process) will save checkpoints to:: - - awesome-training-run/checkpoints/ep1-ba42-rank0.tar - awesome-training-run/checkpoints/ep1-ba42-rank1.tar - awesome-training-run/checkpoints/ep1-ba42-rank2.tar - ... + The rank zero process will save the checkpoint to ``'awesome-training-run/checkpoints/ep1-ba42-rank0'``, + and a symlink will be created at ``'awesome-training-run/checkpoints/latest-rank0' -> + 'awesome-training-run/checkpoints/ep1-ba42-rank0'`` Corresponding symlinks will be created at:: @@ -236,7 +212,7 @@ class CheckpointSaver(Callback): # noqa: D101 remote file systems. weights_only (bool): If ``True``, save only the model weights instead of the entire training state. - This parameter must be ``False`` when using DeepSpeed. Default: ``False``. + Default: ``False``. ignore_keys (list[str] | (dict) -> None, optional): A list of paths for the ``state_dict`` of the checkpoint, which, when provided, will be ignored from the state_dict before a checkpoint is saved. Each path is a list @@ -269,10 +245,7 @@ class CheckpointSaver(Callback): # noqa: D101 .. note:: - When using DeepSpeed, the index of a filepath in each list corresponds to the global rank of - the process that wrote that file. Each filepath is valid only on the process's (rank's) node. - - Otherwise, when not using DeepSpeed, each sub-list will contain only one filepath since only rank zero + Each sub-list will contain only one filepath since only rank zero saves checkpoints. """ @@ -393,9 +366,6 @@ def fit_start(self, state: State, logger: Logger) -> None: dist.barrier() # holds all ranks until folder check is done - if is_model_deepspeed(state.model) and self.weights_only: - raise NotImplementedError('weights_only=True is not supported when using DeepSpeed.') - self.start_batch = state.timestamp.batch def batch_checkpoint(self, state: State, logger: Logger): @@ -466,13 +436,8 @@ def _upload_checkpoint( def _save_checkpoint(self, state: State, logger: Logger): self.last_checkpoint_batch = state.timestamp.batch - is_deepspeed = is_model_deepspeed(state.model) - - if is_deepspeed and '{rank}' not in self.filename.filename: - raise ValueError(f'Save filename {self.filename.filename} must have {{rank}} for deepspeed.') - # save the checkpoint to the filename - filename_with_placeholders = self.filename.format(state, is_deepspeed, keep_placeholders=True) + filename_with_placeholders = self.filename.format(state, keep_placeholders=True) save_filename = checkpoint.get_save_filename(state, filename_with_placeholders) # Store before saving so state_dict in checkpoint has reference to latest checkpoint (itself) self.all_saved_checkpoints_to_timestamp[save_filename] = state.timestamp @@ -505,7 +470,7 @@ def _save_checkpoint(self, state: State, logger: Logger): self.rank_saves_symlinks = dist.get_global_rank() == 0 or not state.fsdp_sharded_state_dict_enabled if self.latest_filename is not None and self.num_checkpoints_to_keep != 0: - symlink = self.latest_filename.format(state, is_deepspeed) + symlink = self.latest_filename.format(state) os.makedirs(os.path.dirname(symlink), exist_ok=True) try: os.remove(symlink) @@ -524,7 +489,6 @@ def _save_checkpoint(self, state: State, logger: Logger): if state.fsdp_sharded_state_dict_enabled: remote_file_name = self.remote_file_name.format( state, - is_deepspeed, keep_placeholders=True, ).lstrip('/') assert state.fsdp_config is not None @@ -549,10 +513,7 @@ def _save_checkpoint(self, state: State, logger: Logger): logger=logger, ) else: - remote_file_name = self.remote_file_name.format( - state, - is_deepspeed, - ).lstrip('/') + remote_file_name = self.remote_file_name.format(state).lstrip('/') log.debug(f'Uploading checkpoint to {remote_file_name}') try: @@ -572,10 +533,7 @@ def _save_checkpoint(self, state: State, logger: Logger): # symlinks stay the same with sharded checkpointing if self.latest_remote_file_name is not None: - symlink_name = self.latest_remote_file_name.format( - state, - is_deepspeed, - ).lstrip('/') + '.symlink' + symlink_name = self.latest_remote_file_name.format(state).lstrip('/') + '.symlink' # create and upload a symlink file symlink_filename = os.path.join( diff --git a/composer/core/algorithm.py b/composer/core/algorithm.py index cd4b59043c..80a8b7bbe4 100644 --- a/composer/core/algorithm.py +++ b/composer/core/algorithm.py @@ -47,10 +47,6 @@ def find_unused_parameters(self) -> bool: For example, it is used to tell :class:`torch.nn.parallel.DistributedDataParallel` (DDP) that some parameters will be frozen during training, and hence it should not expect gradients from them. All algorithms which do any kind of parameter freezing should override this function to return ``True``. - - .. note:: - - DeepSpeed integration with this function returning True is not tested. It may not work as expected. """ return False diff --git a/composer/core/state.py b/composer/core/state.py index 4c1e1a92bb..b6c280d416 100644 --- a/composer/core/state.py +++ b/composer/core/state.py @@ -52,13 +52,10 @@ dist, ensure_tuple, get_composer_env_dict, - is_model_deepspeed, reproducibility, ) if TYPE_CHECKING: - import deepspeed - from composer.core.algorithm import Algorithm from composer.core.callback import Callback from composer.core.evaluator import Evaluator @@ -330,7 +327,6 @@ class State(Serializable): save_metrics (bool, optional): Whether to save metrics in state_dict. algorithms (Algorithm | Sequence[Algorithm], optional): The algorithms used for training. callbacks (Callback | Sequence[Callback], optional): The callbacks used for training. - deepspeed_config (dict[str, Any], optional): The configuration dictionary for deepspeed. parallelism_config (ParallelismConfig, optional): The configuration dictionary for parallelism. Attributes: @@ -397,9 +393,8 @@ class State(Serializable): .. note:: - When using DeepSpeed or multi-rank training, the model will be wrapped with - :class:`~deepspeed.DeepSpeedEngine` or :class:`~torch.nn.parallel.DistributedDataParallel`, - respectively. + When using multi-rank training with DDP, the model will be wrapped with + :class:`~torch.nn.parallel.DistributedDataParallel`. outputs (torch.Tensor | Sequence[torch.Tensor]): The most recently computed output from the model's forward pass. @@ -497,7 +492,6 @@ def __init__( callbacks: Optional[Union[Callback, Sequence[Callback]]] = None, # Distributed training configs - deepspeed_config: Optional[dict[str, Any]] = None, parallelism_config: Optional[ParallelismConfig] = None, ): self.rank_zero_seed = rank_zero_seed @@ -542,7 +536,6 @@ def __init__( self.profiler: Optional[Profiler] = None - self.deepspeed_config = deepspeed_config self.fsdp_config = parallelism_config.fsdp if parallelism_config is not None else None self.tp_config = parallelism_config.tp if parallelism_config is not None else None @@ -880,11 +873,6 @@ def evaluators(self): def evaluators(self, evaluators: Union[Evaluator, Sequence[Evaluator]]): self._evaluators[:] = list(ensure_tuple(evaluators)) - @property - def deepspeed_enabled(self): - """Indicates if deepspeed is enabled.""" - return self.deepspeed_config is not None - @property def fsdp_enabled(self): """Indicates if FSDP is enabled.""" @@ -1765,10 +1753,3 @@ def precision_config(self): def is_model_ddp(self): """Whether :attr:`model` is an instance of a :class:`.DistributedDataParallel`.""" return isinstance(self.model, DistributedDataParallel) - - @property - def deepspeed_model(self) -> deepspeed.DeepSpeedEngine: - """Cast :attr:`model` to :class:`~deepspeed.DeepSpeedEngine`.""" - if is_model_deepspeed(self.model): - return cast('deepspeed.DeepSpeedEngine', self.model) - raise TypeError('state.model is not a DeepSpeed model') diff --git a/composer/distributed/__init__.py b/composer/distributed/__init__.py index ccbf500f34..499972880a 100644 --- a/composer/distributed/__init__.py +++ b/composer/distributed/__init__.py @@ -3,7 +3,6 @@ """Distributed training.""" -from composer.distributed.deepspeed import fix_batch_precision_for_deepspeed, parse_deepspeed_config from composer.distributed.dist_strategy import ( DDPSyncStrategy, ddp_sync_context, @@ -13,8 +12,6 @@ ) __all__ = [ - 'fix_batch_precision_for_deepspeed', - 'parse_deepspeed_config', 'DDPSyncStrategy', 'ddp_sync_context', 'prepare_ddp_module', diff --git a/composer/distributed/deepspeed.py b/composer/distributed/deepspeed.py deleted file mode 100644 index 6df42befce..0000000000 --- a/composer/distributed/deepspeed.py +++ /dev/null @@ -1,182 +0,0 @@ -# Copyright 2022 MosaicML Composer authors -# SPDX-License-Identifier: Apache-2.0 - -"""Helpers for the `DeepSpeed `_ integration with Composer.""" - -import copy -import warnings -from typing import Any, cast - -import torch -import torch.utils.data - -from composer.core import Batch, Precision, State -from composer.utils import dist, map_collection - -__all__ = ['fix_batch_precision_for_deepspeed', 'parse_deepspeed_config'] - - -def _add_batch_config(config: dict[str, Any], state: State): - if state.dataloader is None: - raise ValueError( - 'When using DeepSpeed, the `train_dataloader` must be specified when constructing the Trainer.', - ) - - try: - batch_size = state.dataloader.batch_size # type: ignore as we catch the exception - except AttributeError as e: - raise RuntimeError('DeepSpeed requires the `state.dataloader` to have a `batch_size` attribute.') from e - - assert state.device_train_microbatch_size is not None - if batch_size % state.device_train_microbatch_size != 0: - # DeepSpeed will throw an error in this configuration. - raise ValueError( - 'The Mosaic trainer has been configured to use batch size=' - f'{batch_size}, but this is not divisible by the ' - f'train device microbatch size={state.device_train_microbatch_size}. ' - 'This is unsupported when using DeepSpeed.', - ) - - train_batch_size = batch_size * dist.get_world_size() - # Per the check at the start of this function, the following division is always clean. - grad_accum = batch_size // state.device_train_microbatch_size - - if 'train_batch_size' in config: - ds_train_batch_size = config['train_batch_size'] - if ds_train_batch_size != train_batch_size: - raise ValueError( - f'Provided DeepSpeed configuration specifies batch size={ds_train_batch_size}, ' - f'but the Mosaic trainer has been configured with batch size={train_batch_size}.', - ) - else: - config['train_batch_size'] = train_batch_size - - if 'gradient_accumulation_steps' not in config: - config['gradient_accumulation_steps'] = grad_accum - - if 'train_micro_batch_size_per_gpu' in config: - ds_per_gpu_microbatch_size = config['train_micro_batch_size_per_gpu'] - if ds_per_gpu_microbatch_size != state.device_train_microbatch_size: - raise ValueError( - 'Provided DeepSpeed configuration specifies per-GPU microbatch size=' - f'{ds_per_gpu_microbatch_size}, but the Mosaic trainer has been ' - f'configured with per-GPU microbatch size={state.device_train_microbatch_size}.', - ) - else: - config['train_micro_batch_size_per_gpu'] = state.device_train_microbatch_size - - -def _ensure_no_optim_in_config(config: dict[str, Any]): - if 'optimizer' in config: - raise ValueError(( - 'The DeepSpeed configuration specifies an optimizer, but the Mosaic ' - 'trainer will override this setting.' - )) - - if 'scheduler' in config: - raise ValueError(( - 'The DeepSpeed configuration specifies a scheduler, but the Mosaic ' - 'trainer will override this setting.' - )) - - -def _add_precision_config(config: dict[str, Any], state: State): - precision = state.precision - - # Verify DeepSpeed config is consistent with state.precision if set. DeepSpeed precision config - # has many different ways to specify approximately the same thing. See https://www.deepspeed.ai/docs/config-json/. - ds_precision = None - if 'fp16' in config and 'enabled' in config['fp16'] and config['fp16']['enabled']: - ds_precision = Precision.AMP_FP16 - elif 'bf16' in config and 'enabled' in config['bf16'] and config['bf16']['enabled']: - ds_precision = Precision.AMP_BF16 - elif 'amp' in config and 'enabled' in config['amp'] and config['amp']['enabled']: - ds_precision = Precision.AMP_FP16 - if ds_precision is not None and ds_precision != precision: - raise ValueError(( - f'Provided DeepSpeed configuration specifies precision={ds_precision}, ' - f'but the Mosaic trainer has been configured with precision={precision}.' - )) - - # set DeepSpeed config based on state.precision if not set - if precision == Precision.AMP_FP16 and 'fp16' not in config: - config['fp16'] = cast(dict[str, Any], {'enabled': True}) - elif precision == Precision.AMP_BF16 and 'bf16' not in config: - config['bf16'] = cast(dict[str, Any], {'enabled': True}) - - -def parse_deepspeed_config( - config: dict[str, Any], - state: State, -) -> dict[str, Any]: - """Parses the provided DeepSpeed config for compatibility with the Mosaic trainer. - - Broadly speaking, this function does three things. - - 1. Check for settings that are unsupported, like DeepSpeed optimizers. - - 2. Check for inconsistencies between Mosaic trainer config and DeepSpeed config. - - 3. Use Mosaic trainer config to fill in some defaults for DeepSpeed config. - - Args: - config (dict[str, Any]): The DeepSpeed config to use. Must follow the format specified - in `DeepSpeed's documentation `_. - state (State): The state of the trainer. - - Returns: - dict[str, Any]: The DeepSpeed config updated with values from the arguments passed to the - :class:`.Trainer`. - - Raises: - ValueError: If any of the values in the DeepSpeed config conflict with arguments passed - to the trainer. - RuntimeError: If the batch size of the train dataloader in the provided state is not set. - """ - new_config = copy.deepcopy(config) - _add_batch_config(new_config, state) - _ensure_no_optim_in_config(new_config) - _add_precision_config(new_config, state) - if 'zero_allow_untested_optimizer' in new_config and not new_config['zero_allow_untested_optimizer']: - warnings.warn(( - 'Provided DeepSpeed configuration specifies zero_allow_untested_optimizer=False. ' - 'This causes DeepSpeed to reject certain Mosaic optimizers that are known to ' - 'work well with DeepSpeed.' - )) - - new_config['zero_allow_untested_optimizer'] = True - return new_config - - -def _convert_fp32_tensor_to_fp16(tensor: torch.Tensor): - if tensor.dtype == torch.float32: - return tensor.half() - return tensor - - -def _convert_fp32_tensor_to_bf16(tensor: torch.Tensor): - if tensor.dtype == torch.float32: - return tensor.to(torch.bfloat16) - return tensor - - -def fix_batch_precision_for_deepspeed(batch: Batch, precision: Precision) -> Batch: - """Ensures that a batch is properly formatted for DeepSpeed precisions, if active. - - .. note:: Just because the precision is set to FP16 doesn't mean the entire batch can - be FP16 too. For example, integer tensors are common in inputs and outputs of - various models, and these must not be converted. The assumption here is - that a tensor should only be converted to FP16 if it was given in FP32. - - Args: - batch (Batch): The batch of data to adjust the precision for. - precision (Precision): The precision to use. - - Returns: - Batch: The batch with it's precision adjusted to the specified precision. - """ - if precision == Precision.AMP_FP16: - return map_collection(batch, _convert_fp32_tensor_to_fp16) - elif precision == Precision.AMP_BF16: - return map_collection(batch, _convert_fp32_tensor_to_bf16) - return batch diff --git a/composer/loggers/wandb_logger.py b/composer/loggers/wandb_logger.py index d76ee1fbac..6412114168 100644 --- a/composer/loggers/wandb_logger.py +++ b/composer/loggers/wandb_logger.py @@ -44,9 +44,7 @@ class WandBLogger(LoggerDestination): rank_zero_only (bool, optional): Whether to log only on the rank-zero process. When logging `artifacts `_, it is highly recommended to log on all ranks. Artifacts from ranks ≥1 will not be - stored, which may discard pertinent information. For example, when using - Deepspeed ZeRO, it would be impossible to restore from checkpoints without - artifacts from all ranks (default: ``True``). + stored, which may discard pertinent information (default: ``True``). init_kwargs (dict[str, Any], optional): Any additional init kwargs ``wandb.init`` (see `WandB documentation `_). diff --git a/composer/trainer/trainer.py b/composer/trainer/trainer.py index c39a0b7b83..86008d51a4 100644 --- a/composer/trainer/trainer.py +++ b/composer/trainer/trainer.py @@ -53,7 +53,7 @@ else: from torch.cuda.amp.grad_scaler import GradScaler, _refresh_per_optimizer_state # type: ignore -from composer.callbacks import CheckpointSaver, MemorySnapshot, OOMObserver, OptimizerMonitor +from composer.callbacks import CheckpointSaver, MemorySnapshot, OOMObserver from composer.core import ( Algorithm, AlgorithmPass, @@ -79,8 +79,6 @@ from composer.distributed import ( DDPSyncStrategy, ddp_sync_context, - fix_batch_precision_for_deepspeed, - parse_deepspeed_config, prepare_ddp_module, prepare_fsdp_module, prepare_tp_module, @@ -107,12 +105,10 @@ MLFLOW_RUN_ID_FORMAT_KEY, ExportFormat, FSDPConfig, - MissingConditionalImportError, ObjectStore, ParallelismConfig, TPConfig, Transform, - VersionedDeprecationWarning, checkpoint, dist, ensure_tuple, @@ -122,7 +118,6 @@ get_composer_env_dict, get_device, get_file, - is_model_deepspeed, is_xla_installed, map_collection, maybe_create_object_store_from_uri, @@ -535,17 +530,6 @@ def _get_ddp_sync_strategy(ddp_sync_strategy: Optional[Union[str, DDPSyncStrateg return ddp_sync_strategy -def _get_precision_context( - precision: Precision, - precision_config: Optional[dict[str, Any]], - deepspeed_enabled: bool, - fp8_autocast_enabled: bool = True, -): - if deepspeed_enabled: - return contextlib.nullcontext() - return get_precision_context(precision, precision_config, fp8_autocast_enabled) - - def _generate_run_name() -> str: # change coolname randomness for different names with same seed coolname.replace_random(random.Random(os.urandom(128))) @@ -790,33 +774,8 @@ class Trainer: It can be a path to a file on the local disk, a URL, or if ``load_object_store`` is set, the object name for a checkpoint in a cloud bucket. If a URI is specified, ``load_object_store`` does not need to be set. - When using `Deepspeed ZeRO `_, checkpoints are sharded by rank. - Instead of hard-coding the rank in the ``path``, use the following format variables: - - +------------------------+-------------------------------------------------------+ - | Variable | Description | - +========================+=======================================================+ - | ``{rank}`` | The global rank, as returned by | - | | :func:`~.dist.get_global_rank`. | - +------------------------+-------------------------------------------------------+ - | ``{local_rank}`` | The local rank of the process, as returned by | - | | :func:`~.dist.get_local_rank`. | - +------------------------+-------------------------------------------------------+ - | ``{node_rank}`` | The node rank, as returned by | - | | :func:`~.dist.get_node_rank`. | - +------------------------+-------------------------------------------------------+ - - For example, suppose that checkpoints are stored in the following structure: - - .. code-block:: - - my_model/ep1-rank0.tar - my_model/ep1-rank1.tar - my_model/ep1-rank2.tar - ... - - Then, ``load_path`` should be set to ``my_model/ep1-rank{rank}.tar``, and all ranks will load the - correct state. + When using FSDP with sharded checkpointing, checkpoint files are sharded by rank, and ``load_path`` + should be set to the directory containing sharded checkpoint files. If ``None`` then no checkpoint will be loaded. (default: ``None``) load_object_store (Union[ObjectStore, LoggerDestination], optional): If the ``load_path`` is in an @@ -985,11 +944,6 @@ class Trainer: to get the starting checkpoint. For any future restarts, such as due to the spot instance being killed, the loggers would be queried for the latest checkpoint the object store logger would be downloaded and used to resume training. - deepspeed_config (dict[str, Any], optional): Configuration for DeepSpeed, formatted as a JSON - according to `DeepSpeed's documentation `_. (default: ``None``) - - To use DeepSpeed with default values, set to the empty dictionary ``{}``. - To disable DeepSpeed (the default), set to ``None``. parallelism_config (Union[dict[str, Any], ParallelismConfig], optional): Configuration for parallelism options. Currently supports fsdp and tensor parallelism, whose respective configs are specified as the keys ``fsdp`` and ``tp``. (default: ``None``) @@ -1149,7 +1103,6 @@ def __init__( autoresume: bool = False, # Parallelism - deepspeed_config: Optional[dict[str, Any]] = None, parallelism_config: Optional[Union[dict[str, Any], ParallelismConfig]] = None, # System/Numerics @@ -1176,16 +1129,6 @@ def __init__( # compile config for PyTorch 2.0 or higher compile_config: Optional[dict[str, Any]] = None, ): - if deepspeed_config is not None: - warnings.warn( - VersionedDeprecationWarning( - 'The use of DeepSpeed for training new models in Composer is deprecated. Composer is tightly integrated with PyTorch FSDP ' - + - 'which provides similar functionality. Please use the `parallelism_config` parameter instead. Please open ' - + 'a GitHub issue if you need help migrating from DeepSpeed to FSDP.', - remove_version='0.27.0', - ), - ) self.auto_log_hparams = auto_log_hparams self.python_log_level = python_log_level @@ -1289,12 +1232,8 @@ def __init__( parallelism_config = ParallelismConfig( **parallelism_config_args, ) if len(parallelism_config_args) > 0 else None - if deepspeed_config is not None and parallelism_config is not None: - raise ValueError( - 'Both deepspeed_config and parallelism_config are specified but incompatible. Please specify only one.', - ) - if deepspeed_config is not None or parallelism_config is not None or dist.get_world_size() > 1: - # Deepspeed and FSDP both require torch.distributed to be initialized, even if the world size is 1 + if parallelism_config is not None or dist.get_world_size() > 1: + # FSDP requires torch.distributed to be initialized, even if the world size is 1 # And torch.distributed is always required for multi-rank training dist.initialize_dist(device, dist_timeout) if parallelism_config is not None: @@ -1334,7 +1273,7 @@ def __init__( raise NotImplementedError(f'Only one optimizer is supported; found {num_optimizers} optimizers') # Move the model and optimizers to the device - if deepspeed_config is None and parallelism_config is None: + if parallelism_config is None: # Check if model is already on tpu if isinstance(device, DeviceTPU) and 'xla' not in str(next(model.parameters()).device): raise ValueError( @@ -1370,7 +1309,6 @@ def __init__( optimizers=optimizers, run_name=run_name, save_metrics=save_metrics, - deepspeed_config=deepspeed_config, parallelism_config=parallelism_config, ) self.accumulate_train_batch_on_tokens = accumulate_train_batch_on_tokens @@ -1696,7 +1634,7 @@ def __init__( # suppressing FSDP warning when auto grad accum exits the forward pass before completing warnings.filterwarnings(action='ignore', message='Forward order differs from that of the first iteration') - # If using DDP or DeepSpeed, we need to wrap the ComposerModel but store a reference to the + # If using DDP, we need to wrap the ComposerModel but store a reference to the # original model for functions like `eval_forward`, `get_metrics`, etc. self._original_model = self.state.model @@ -1704,7 +1642,6 @@ def __init__( # If using TP, the model must be wrapped before FSDP. # If using FSDP, the model must be wrapped and then loaded unless loading a monolith # checkpoint on rank 0 only, in which case the model be loaded before it is wrapped. - # If using DeepSpeed, the engine must be initialized before the model is loaded. # TP wrap if self.state.tp_config is not None: @@ -1730,45 +1667,6 @@ def __init__( self.state.seed, ) - # Configure Deepspeed - if self.state.deepspeed_config is not None: - for callback in self.state.callbacks: - if isinstance(callback, OptimizerMonitor): - raise ValueError( - 'OptimizerMonitor is not supported with DeepSpeed because DeepSpeed clears ' - 'the gradients before in the last call to .backward see: ' - 'https://github.com/microsoft/DeepSpeed/issues/2329 for more details.', - ) - - try: - import deepspeed - except ImportError as e: - raise MissingConditionalImportError( - extra_deps_group='deepspeed', - conda_package='deepspeed>=0.5.5', - conda_channel=None, - ) from e - self.state.deepspeed_config = parse_deepspeed_config(self.state.deepspeed_config, state=self.state) - optimizer = ensure_tuple(self.state.optimizers)[0] - log.debug('Initializing deepspeed') - (self.state.model, self.state.optimizers, _, _) = deepspeed.initialize( - config=self.state.deepspeed_config, - model=self.state.model, - optimizer=optimizer, - ) - # Since the DeepSpeed ZeRO optimizer does not inherit torch.optim.Optimizer, the schedulers must be - # compiled and bound BEFORE DeepSpeed initialization. However, this is OK, as the the DeepSpeed Zero - # optimizer uses the same underlying parameter groups as the original optimizer. See - # * https://github.com/microsoft/DeepSpeed/blob/fee73135980e78f8be7e1a3ff556751623ef6aaa/deepspeed/runtime/zero/stage_1_and_2.py#L1911-L1917 - # * https://github.com/microsoft/DeepSpeed/blob/ef17c89570ceae5b26a5f886e9d8cd0941afc0ac/deepspeed/runtime/zero/stage3.py#L2532-L2538 - # In addition, the deepspeed engine is responsible for serializing the model and optimizer state, - # so these attributes should not be serialized with the composer state. - if 'model' in self.state.serialized_attributes: - self.state.serialized_attributes.remove('model') - - if 'optimizers' in self.state.serialized_attributes: - self.state.serialized_attributes.remove('optimizers') - self.engine.run_event(Event.BEFORE_LOAD) # Load Checkpoint @@ -1928,7 +1826,7 @@ def __init__( reproducibility.seed_all(self.state.seed) # DDP wrap if required - if not self.state.deepspeed_enabled and not self.state.fsdp_enabled and dist.get_world_size() > 1: + if not self.state.fsdp_enabled and dist.get_world_size() > 1: self.state.model = prepare_ddp_module(self.state.model, self._find_unused_parameters) # The model would need to be torch.compile()'d after being wrapped in a distributed strategy @@ -1949,8 +1847,8 @@ def saved_checkpoints(self) -> list[str]: .. note:: - For DeepSpeed, which saves file on every rank, only the files corresponding to the process's rank - will be shown. + For sharded checkpointing, which saves file on every rank, only the files corresponding + to the process's rank will be shown. """ if self._checkpoint_saver is None: return [] @@ -2012,87 +1910,61 @@ def _get_autoresume_checkpoint( f'Looking for autoresume checkpoint: {save_latest_remote_file_name} (remote), {latest_checkpoint_path} (local)', ) - if self.state.deepspeed_enabled: - # If latest checkpoint is not saved locally, try to fetch from loggers - if not os.path.exists(latest_checkpoint_path): - log.debug(f'Attempting to download the checkpoint on to rank {dist.get_global_rank()}') - os.makedirs(save_folder, exist_ok=True) - self._try_checkpoint_download( - latest_checkpoint_path, - save_latest_remote_file_name, - loggers, - load_progress_bar, - ) - - # List of whether the checkpoint exists on each rank - latest_checkpoint_exists = dist.all_gather_object(os.path.exists(latest_checkpoint_path)) - - if all(latest_checkpoint_exists): # All paths exist, so return the path. - return latest_checkpoint_path - # Require all ranks to have their own local checkpoint if we wish to restore from it for - # deepspeed or fsdp + sharding - elif any(latest_checkpoint_exists): # Some but not all exist, which is very bad. - missing_ranks = [n for (n, exist) in enumerate(latest_checkpoint_exists) if not exist] - mode = 'Deepspeed' if self.state.deepspeed_enabled else 'FSDP sharding' - raise RuntimeError(f'{mode} was enabled, but checkpoints missing on ranks: {missing_ranks}') - else: # None of the paths exists, so no autoresume necessary. - return None - else: - # broadcast the local checkpoint path to all ranks - latest_checkpoint_path_list = [os.path.abspath(latest_checkpoint_path)] - dist.broadcast_object_list(latest_checkpoint_path_list, src=0) - latest_checkpoint_path = latest_checkpoint_path_list[0] - - # broadcast the remote checkpoint path to all ranks - save_latest_remote_file_name_list = [save_latest_remote_file_name] - dist.broadcast_object_list(save_latest_remote_file_name_list, src=0) - save_latest_remote_file_name = save_latest_remote_file_name_list[0] - - # try to download the checkpoint on local rank 0 of all nodes - if dist.get_local_rank() == 0 and not os.path.exists(latest_checkpoint_path): - log.debug(f'Attempting to download the checkpoint {save_latest_remote_file_name} on to all nodes') - os.makedirs(save_folder, exist_ok=True) - self._try_checkpoint_download( - latest_checkpoint_path, - save_latest_remote_file_name, - loggers, - load_progress_bar, - ) - - signal_file_path = os.path.join( - os.path.dirname(latest_checkpoint_path), - dist.get_node_signal_file_name(), + # Broadcast the local checkpoint path to all ranks + latest_checkpoint_path_list = [os.path.abspath(latest_checkpoint_path)] + dist.broadcast_object_list(latest_checkpoint_path_list, src=0) + latest_checkpoint_path = latest_checkpoint_path_list[0] + + # Broadcast the remote checkpoint path to all ranks + save_latest_remote_file_name_list = [save_latest_remote_file_name] + dist.broadcast_object_list(save_latest_remote_file_name_list, src=0) + save_latest_remote_file_name = save_latest_remote_file_name_list[0] + + # Try to download the checkpoint on local rank 0 of all nodes + if dist.get_local_rank() == 0 and not os.path.exists(latest_checkpoint_path): + log.debug(f'Attempting to download the checkpoint {save_latest_remote_file_name} on to all nodes') + os.makedirs(save_folder, exist_ok=True) + self._try_checkpoint_download( + latest_checkpoint_path, + save_latest_remote_file_name, + loggers, + load_progress_bar, ) - if dist.get_local_rank() == 0: - os.makedirs(os.path.dirname(signal_file_path), exist_ok=True) - with open(signal_file_path, 'wb') as f: - f.write(b'local_rank0_completed_autoresume') - - # Avoid the collective call until the local rank zero has finished trying to download the checkpoint - # so that we don't timeout for large downloads. This syncs all processes on the node - with dist.local_rank_zero_download_and_wait(signal_file_path): - # Then, wait to ensure every node has finished downloading the checkpoint - dist.barrier() - - if dist.get_local_rank() == 0: - os.remove(signal_file_path) + + signal_file_path = os.path.join( + os.path.dirname(latest_checkpoint_path), + dist.get_node_signal_file_name(), + ) + if dist.get_local_rank() == 0: + os.makedirs(os.path.dirname(signal_file_path), exist_ok=True) + with open(signal_file_path, 'wb') as f: + f.write(b'local_rank0_completed_autoresume') + + # Avoid the collective call until the local rank zero has finished trying to download the checkpoint + # so that we don't timeout for large downloads. This syncs all processes on the node + with dist.local_rank_zero_download_and_wait(signal_file_path): + # Then, wait to ensure every node has finished downloading the checkpoint dist.barrier() - # At this point the rank 0 filepath should exist on all ranks if the download succeeded - # list of whether the checkpoint exists on each rank - latest_checkpoint_exists = dist.all_gather_object(os.path.exists(latest_checkpoint_path)) + if dist.get_local_rank() == 0: + os.remove(signal_file_path) + dist.barrier() - log.debug( - f'Checkpoint {latest_checkpoint_path} exists on rank {dist.get_global_rank()}? {os.path.exists(latest_checkpoint_path)}', - ) + # At this point the rank 0 filepath should exist on all ranks if the download succeeded + # list of whether the checkpoint exists on each rank + latest_checkpoint_exists = dist.all_gather_object(os.path.exists(latest_checkpoint_path)) + + log.debug( + f'Checkpoint {latest_checkpoint_path} exists on rank {dist.get_global_rank()}? {os.path.exists(latest_checkpoint_path)}', + ) - if not latest_checkpoint_exists[0]: - # If the checkpoint doesn't exist on rank 0, don't crash, so the initial autoresume run can succeed - return None - elif not all(latest_checkpoint_exists): - raise RuntimeError('Downloading the checkpoint to all nodes failed') + if not latest_checkpoint_exists[0]: + # If the checkpoint doesn't exist on rank 0, don't crash, so the initial autoresume run can succeed + return None + elif not all(latest_checkpoint_exists): + raise RuntimeError('Downloading the checkpoint to all nodes failed') - return latest_checkpoint_path + return latest_checkpoint_path def fit( self, @@ -2331,7 +2203,7 @@ def fit( # Evaluators if eval_dataloader is not None: # Need to use the `original_model` rather than `state.model`, as `state.model` - # could be DDP / DeepSpeed wrapped. + # could be DDP wrapped. eval_metrics = self._original_model.get_metrics(is_train=False) metric_names = [str(k) for k in eval_metrics.keys()] eval_dataloader = ensure_tuple(eval_dataloader) @@ -2414,8 +2286,6 @@ def fit( # Precision if precision is not None: if Precision(precision) != self.state.precision: - if self.state.deepspeed_enabled: - raise ValueError('Changing the precision when using DeepSpeed is not supported') precision = Precision(precision) _validate_precision(precision, self.state.device) self.state.precision = precision @@ -2460,14 +2330,7 @@ def _ensure_metrics_device_and_dtype( metrics[name] = DeviceCPU().module_to_device(metric) else: metrics[name] = self.state.device.module_to_device(metric) - if is_model_deepspeed(self.state.model): - # HACK: DeepSpeed somehow manages to convert metric internal states to its own dtype. When - # running with FP16, this tends to result in overflows. Let's assume FP32 is good enough. - for key in metric._defaults: - metric_data = getattr(metric, key) - if isinstance(metric_data, torch.Tensor) and metric_data.dtype == torch.float16: - metric_data = metric_data.to(torch.float32) # type: ignore - setattr(metric, key, metric_data) + return metrics def _compute_and_log_metrics(self, dataloader_label: str, metrics: dict[str, Metric]): @@ -2626,9 +2489,6 @@ def _train_loop(self) -> None: rank_num_samples = self._train_data_spec.get_num_samples_in_batch(self.state.batch) rank_num_tokens = self._train_data_spec.get_num_tokens_in_batch(self.state.batch) - if self.state.deepspeed_enabled: - self.state.batch = fix_batch_precision_for_deepspeed(self.state.batch, self.state.precision) - self.engine.run_event(Event.AFTER_DATALOADER) self.engine.run_event(Event.BATCH_START) @@ -2791,7 +2651,7 @@ def _eval_train_metrics(self, device_batch): # https://github.com/NVIDIA/TransformerEngine/blob/8e039fdcd98fc56582d81e373880c1509c2b8f73/transformer_engine/pytorch/module/base.py#L495-L513 for more info. with torch.no_grad(),\ model_eval_mode(self.state.model),\ - _get_precision_context(self.state.precision, self.state.precision_config, self.state.deepspeed_enabled, fp8_autocast_enabled=False): + get_precision_context(self.state.precision, self.state.precision_config, fp8_autocast_enabled=False): eval_outputs = self._original_model.eval_forward(device_batch, self.state.outputs) for metric in self.state.train_metrics.values(): self._original_model.update_metric( @@ -2875,12 +2735,11 @@ def _train_batch(self, use_grad_scaling: bool) -> dict[str, torch.Tensor]: ) else: self._train_microbatches(microbatches, total_loss_dict) - if not self.state.deepspeed_enabled: - for optimizer in self.state.optimizers: - if use_grad_scaling: - self.state.scaler.step(optimizer) - else: - optimizer.step() + for optimizer in self.state.optimizers: + if use_grad_scaling: + self.state.scaler.step(optimizer) + else: + optimizer.step() except RuntimeError as e: if self.state.auto_microbatching and str(e) == OOM_FOUND_ON_OTHER_RANK: log.debug((f"A Different Rank OOM'd.")) @@ -3010,12 +2869,11 @@ def _train_microbatches( use_grad_scaling = self._use_grad_scaling(self.state.precision, self.state.scaler) - if not self.state.deepspeed_enabled: - for optimizer in self.state.optimizers: - try: - optimizer.zero_grad(set_to_none=True) - except TypeError: - optimizer.zero_grad() + for optimizer in self.state.optimizers: + try: + optimizer.zero_grad(set_to_none=True) + except TypeError: + optimizer.zero_grad() # Tracker for gradient accumulation if self.accumulate_train_batch_on_tokens: @@ -3084,7 +2942,7 @@ def _train_microbatch( ) else: microbatch_size = self._train_data_spec.get_num_samples_in_batch(self.state.batch) - if self.state.deepspeed_enabled or not isinstance(self.state.model, DistributedDataParallel): + if not isinstance(self.state.model, DistributedDataParallel): sync_context = contextlib.nullcontext() elif self.state.auto_microbatching and not self.first_train_batch_complete: # PyTorch DDP rebuilds gradient reduction buckets after 1) a forward pass where the @@ -3110,10 +2968,9 @@ def _train_microbatch( # Forward pass self.engine.run_event(Event.BEFORE_FORWARD) - with _get_precision_context( + with get_precision_context( self.state.precision, self.state.precision_config, - self.state.deepspeed_enabled, ): self.state.outputs = self.state.model(self.state.batch) @@ -3136,10 +2993,9 @@ def _train_microbatch( # Loss self.engine.run_event(Event.BEFORE_LOSS) - with _get_precision_context( + with get_precision_context( self.state.precision, self.state.precision_config, - self.state.deepspeed_enabled, ): self.state.loss = self._original_model.loss(self.state.outputs, self.state.batch) @@ -3178,12 +3034,9 @@ def _train_microbatch( if use_grad_scaling: microbatch_loss = cast(torch.Tensor, self.state.scaler.scale(microbatch_loss)) # type: ignore - if self.state.deepspeed_enabled: - self.state.deepspeed_model.backward(microbatch_loss) - else: - # Scale loss based on the number of samples in the microbatch to maintain gradient numerics - microbatch_loss.mul_(microbatch_size / current_batch_size) - microbatch_loss.backward(create_graph=self._backwards_create_graph) + # Scale loss based on the number of samples in the microbatch to maintain gradient numerics + microbatch_loss.mul_(microbatch_size / current_batch_size) + microbatch_loss.backward(create_graph=self._backwards_create_graph) if self.state.device.dist_backend == 'xla': # For xla devices, the program between any pair of mark_steps() calls is compiled. With out this, the @@ -3200,9 +3053,6 @@ def _train_microbatch( self.state.train_metrics = self._ensure_metrics_device_and_dtype(self.state.train_metrics) self._eval_train_metrics(device_batch) - if self.state.deepspeed_enabled: - self.state.deepspeed_model.step() - return microbatch_loss_dict def _increment_iteration(self): @@ -3317,17 +3167,12 @@ def predict_batch_end(self, state: State, logger: Logger) -> None: rank_num_samples = data_spec.get_num_samples_in_batch(self.state.batch) rank_num_tokens = data_spec.get_num_tokens_in_batch(self.state.batch) - # Fix the batch if using DeepSpeed - if self.state.deepspeed_enabled: - self.state.batch = fix_batch_precision_for_deepspeed(self.state.batch, self.state.precision) - self.engine.run_event(Event.PREDICT_BATCH_START) self.engine.run_event(Event.PREDICT_BEFORE_FORWARD) - with _get_precision_context( + with get_precision_context( self.state.precision, self.state.precision_config, - self.state.deepspeed_enabled, ): self.state.outputs = self.state.model(self.state.batch) self.engine.run_event(Event.PREDICT_AFTER_FORWARD) @@ -3602,9 +3447,6 @@ def _eval_loop( raise ValueError('Number of samples in a batch should be an integer.') last_batch = self.state.eval_timestamp.sample + batch_num_samples >= dataset_len - if self.state.deepspeed_enabled: - self.state.batch = fix_batch_precision_for_deepspeed(self.state.batch, self.state.precision) - self.engine.run_event(Event.EVAL_BATCH_START) # Cache the device batch, because `self.state.batch` gets overridden in microbatching loop @@ -3646,10 +3488,9 @@ def _eval_loop( # Note: the activation dtype is BF16 if FSDP Mixed Precision PURE is enabled and FP32 if FSDP Mixed Precision FULL is enabled. # See https://github.com/NVIDIA/TransformerEngine/blob/8e039fdcd98fc56582d81e373880c1509c2b8f73/transformer_engine/pytorch/module/linear.py#L250-L252 and \ # https://github.com/NVIDIA/TransformerEngine/blob/8e039fdcd98fc56582d81e373880c1509c2b8f73/transformer_engine/pytorch/module/base.py#L495-L513 for more info. - with _get_precision_context( + with get_precision_context( self.state.precision, self.state.precision_config, - self.state.deepspeed_enabled, fp8_autocast_enabled=False, ): self.state.outputs = self._original_model.eval_forward(self.state.batch) @@ -3662,10 +3503,9 @@ def _eval_loop( continue # Run in same precision context to avoid NaNs - with _get_precision_context( + with get_precision_context( self.state.precision, self.state.precision_config, - self.state.deepspeed_enabled, ): if isinstance(self.state.device, DeviceMPS): # torchmetrics math has numerical errors on M1 devices @@ -3786,9 +3626,6 @@ def _use_grad_scaling(self, precision: Union[str, Precision], scaler: Optional[G Occurs when attempting to use grad scaling without the scaler enabled. Likely due to hardware not supporting the provided precision. """ - if self.state.deepspeed_enabled: - return False - precision = Precision(precision) use_grad_scaling = precision == Precision.AMP_FP16 @@ -3852,9 +3689,6 @@ def _use_closures(self) -> bool: We default to using closures unless AMP is enabled, in which case we only allow closures when using optimizers with the _step_supports_amp_closure flag. """ - if self.state.deepspeed_enabled: - return False - if self.state.device.dist_backend == 'xla': return False diff --git a/composer/utils/__init__.py b/composer/utils/__init__.py index 283ab446c1..1b115bb6ff 100644 --- a/composer/utils/__init__.py +++ b/composer/utils/__init__.py @@ -56,7 +56,6 @@ add_vision_dataset_transform, create_interval_scheduler, get_free_tcp_port, - is_model_deepspeed, is_model_fsdp, is_notebook, model_eval_mode, @@ -104,7 +103,6 @@ 'MissingConditionalImportError', 'get_save_filename', 'import_object', - 'is_model_deepspeed', 'is_model_fsdp', 'is_notebook', 'StringEnum', diff --git a/composer/utils/checkpoint.py b/composer/utils/checkpoint.py index baade3dbea..95f4b47614 100644 --- a/composer/utils/checkpoint.py +++ b/composer/utils/checkpoint.py @@ -41,7 +41,7 @@ maybe_create_object_store_from_uri, parse_uri, ) -from composer.utils.misc import ParallelismType, is_model_deepspeed, partial_format +from composer.utils.misc import ParallelismType, partial_format from composer.utils.object_store import ObjectStore from composer.utils.retrying import retry @@ -54,7 +54,6 @@ __all__ = ['get_save_filename', 'load_checkpoint', 'save_checkpoint', 'download_checkpoint'] _COMPOSER_STATES_FILENAME = 'composer_states.pt' -_DEEPSPEED_TAG = 'deepspeed' # always tag with the same, deterministic name. We'll rename the tarball to the appropriate name. _TORCH_DISTRIBUTED_CHECKPOINTS_FILENAME = f'__{dist.get_global_rank()}_0.distcp' _TORCH_DISTRIBUTED_CHECKPOINTS_METADATA_FILENAME = '.metadata' @@ -126,15 +125,6 @@ def _format_path_with_rank_zero(path: str) -> str: ) -def _format_path_with_current_rank(path: str) -> str: - """Formats ``path`` formatted with the current rank values.""" - return path.format( - rank=dist.get_global_rank(), - local_rank=dist.get_local_rank(), - node_rank=dist.get_node_rank(), - ) - - def _get_write_mode(name: str) -> str: """Get the write mode to use with :func:`tarfile.open`.""" if name.endswith('.tar'): @@ -388,29 +378,27 @@ def __init__(self, filename: str, folder: Optional[str] = None): self.folder = folder self.filename = filename - def format(self, state: State, is_deepspeed: bool = False, keep_placeholders: bool = False) -> str: - # if filename already has a suffix (e.g. file.pt), this would append to be file.pt.tar - extra_suffix = '.tar' if is_deepspeed and not is_tar(self.filename) else '' + def format(self, state: State, keep_placeholders: bool = False) -> str: if self.folder: if keep_placeholders: return os.path.join( self.folder, self.filename, - ) + extra_suffix + ) else: return os.path.join( format_name_with_dist(self.folder, state.run_name), format_name_with_dist_and_time(self.filename, state.run_name, state.timestamp), - ) + extra_suffix + ) else: if keep_placeholders: - return self.filename + extra_suffix + return self.filename else: return format_name_with_dist_and_time( self.filename, state.run_name, state.timestamp, - ) + extra_suffix + ) def load_checkpoint( @@ -433,33 +421,8 @@ def load_checkpoint( It can be a path to a file on the local disk, a URL, or if ``object_store`` is set, the object name for a checkpoint in a cloud bucket. - When using `Deepspeed ZeRO `_, checkpoints are sharded by rank. - Instead of hard-coding the rank in the ``path``, use the following format variables: - - +------------------------+-------------------------------------------------------+ - | Variable | Description | - +========================+=======================================================+ - | ``{rank}`` | The global rank, as returned by | - | | :func:`~.dist.get_global_rank`. | - +------------------------+-------------------------------------------------------+ - | ``{local_rank}`` | The local rank of the process, as returned by | - | | :func:`~.dist.get_local_rank`. | - +------------------------+-------------------------------------------------------+ - | ``{node_rank}`` | The node rank, as returned by | - | | :func:`~.dist.get_node_rank`. | - +------------------------+-------------------------------------------------------+ - - For example, suppose that checkpoints are stored in the following structure: - - .. code-block:: - - my_model/ep1-rank0.tar - my_model/ep1-rank1.tar - my_model/ep1-rank2.tar - ... - - Then, ``path`` should be set to ``my_model/ep1-rank{rank}.tar``, and all ranks will load the - correct state. + When using FSDP with sharded checkpointing, checkpoint files are sharded by rank, and ``load_path`` + should be set to the directory containing sharded checkpoint files. state (State): The :class:`~composer.core.State` to load the checkpoint into. logger (Logger): The :class:`~composer.logger.Logger` to log any information. @@ -545,7 +508,6 @@ def load_checkpoint( node_checkpoint_folder=node_checkpoint_folder, object_store=object_store, progress_bar=progress_bar, - deepspeed_sharded_checkpoint=is_model_deepspeed(state.model), ) rng_state_dicts = _restore_checkpoint( state, @@ -775,7 +737,6 @@ def download_checkpoint( node_checkpoint_folder: str, object_store: Optional[Union[ObjectStore, LoggerDestination]], progress_bar: bool, - deepspeed_sharded_checkpoint: bool = False, ) -> tuple[str, Optional[str], bool]: """Download the checkpoint stored at ``path``, potentially in ``object_store``, to ``node_checkpoint_folder``. @@ -783,14 +744,12 @@ def download_checkpoint( * The ``composer_states_filepath``, is the path to the composer states, which can be passed into :meth:`torch.load`. - * The ``extracted_checkpoint_folder`` is the path to the checkpoint folder, which can be passed into - :meth:`deepspeed.DeepSpeedEngine.load_checkpoint`. + * The ``extracted_checkpoint_folder`` is the path to the checkpoint folder. * The ``extracted_rank_n`` is a boolean flag indicating whether a tarball was extracted on global rank greater than 0. """ log.debug('Downloading checkpoint to folder %s', node_checkpoint_folder) rank_zero_checkpoint_filepath = os.path.join(node_checkpoint_folder, 'rank0_checkpoint') - rank_n_checkpoint_filepath = os.path.join(node_checkpoint_folder, f'rank{dist.get_global_rank()}_checkpoint') extracted_checkpoint_folder = None extracted_rank_n = False if is_tar(path): @@ -812,7 +771,7 @@ def download_checkpoint( shutil.copyfileobj(in_file, out_file) try: - if not deepspeed_sharded_checkpoint and dist.get_local_rank() == 0: + if dist.get_local_rank() == 0: # If the checkpoint is not sharded, then local rank 0 on each node needs to download the # global rank 0 checkpoint path = _format_path_with_rank_zero(path) @@ -831,54 +790,26 @@ def download_checkpoint( # the underlying issue is that the checkpoint file does not exist on the disk # or could not be downloaded raise RuntimeError(f'Checkpoint {path} does not exist') - elif deepspeed_sharded_checkpoint: - # If the checkpoint is sharded, then every rank needs to download its own checkpoint - path = _format_path_with_current_rank(path) - try: - get_file( - destination=rank_n_checkpoint_filepath, - path=path, - object_store=object_store, - progress_bar=progress_bar, - ) - except FileNotFoundError as e: - raise FileNotFoundError(( - f'Checkpoint {path} does not exist, but is required for sharded checkpointing ' - f'on rank {dist.get_global_rank()}. Please ensure that the checkpoint exists ' - 'and your load_path was specified as a format string with the {rank} argument.' - )) from e - - if extracted_checkpoint_folder is not None: - try: - # it's an archive and needs to be extracted - with tarfile.open(rank_n_checkpoint_filepath) as tarball: - tarball.extractall(extracted_checkpoint_folder) - extracted_rank_n = True - except FileNotFoundError: - # this will happen most of the time (i.e. whenever deepspeed - # is not being used) so not logging anything - pass finally: # Use busy wait to avoid timeouts on large downloads for non-sharded checkpoints - if not deepspeed_sharded_checkpoint: - signal_file_path = os.path.join( - node_checkpoint_folder, - dist.get_node_signal_file_name(), - ) - if dist.get_local_rank() == 0: - with open(signal_file_path, 'wb') as f: - f.write(b'local_rank0_completed') - - # Avoid the collective call until the local rank zero has finished trying to download the - # checkpoint so that we don't timeout for large downloads. This syncs all processes on the - # node - with dist.local_rank_zero_download_and_wait(signal_file_path): - # Then, wait to ensure every node has finished downloading the checkpoint - dist.barrier() + signal_file_path = os.path.join( + node_checkpoint_folder, + dist.get_node_signal_file_name(), + ) + if dist.get_local_rank() == 0: + with open(signal_file_path, 'wb') as f: + f.write(b'local_rank0_completed') + + # Avoid the collective call until the local rank zero has finished trying to download the + # checkpoint so that we don't timeout for large downloads. This syncs all processes on the + # node + with dist.local_rank_zero_download_and_wait(signal_file_path): + # Then, wait to ensure every node has finished downloading the checkpoint + dist.barrier() - if dist.get_local_rank() == 0: - os.remove(signal_file_path) + if dist.get_local_rank() == 0: + os.remove(signal_file_path) dist.barrier() return composer_states_filepath, extracted_checkpoint_folder, extracted_rank_n @@ -1055,23 +986,7 @@ def _restore_checkpoint( state_dict['state'] = state_dict.get('state', {}) log.debug(f"Loaded checkpoint with keys {state_dict.keys()} and state keys {state_dict['state'].keys()}") - if is_model_deepspeed(state.model): - if extracted_checkpoint_folder is None: - raise RuntimeError('Deepspeed checkpoints require a tarball, not a weights file.') - - global_rank = dist.get_global_rank() - if global_rank > 0 and not extracted_rank_n: - raise RuntimeError(f'Deepspeed checkpoint missing for rank {global_rank}') - - load_path, _ = state.deepspeed_model.load_checkpoint( - extracted_checkpoint_folder, - tag=_DEEPSPEED_TAG, - load_module_only=load_weights_only, - load_module_strict=strict_model_weights, - ) - if load_path is None: - raise RuntimeError(f'Failed to load DeepSpeed checkpoint') - elif load_weights_only: + if load_weights_only: state.load_model_state( state_dict['state'], logger, @@ -1079,7 +994,7 @@ def _restore_checkpoint( exclude_algorithms=exclude_algorithms, algorithm_passes=algorithm_passes, ) - if not load_weights_only: + else: state.load_state_dict( state_dict['state'], logger, @@ -1103,8 +1018,7 @@ def get_save_filename( Full filename of save file. """ if not state.fsdp_sharded_state_dict_enabled: - is_deepspeed = is_model_deepspeed(state.model) - return PartialFilePath(filename).format(state, is_deepspeed) + return PartialFilePath(filename).format(state) # Sharded checkpoints get their own little folder. assert state.fsdp_config is not None @@ -1126,9 +1040,7 @@ def _save_checkpoint( ignore_keys: Optional[Union[list[str], Callable[[dict], None]]] = None, ) -> Union[str, None]: # noqa: D103 - is_deepspeed = is_model_deepspeed(state.model) - - if weights_only and not is_deepspeed: + if weights_only: state_dict = { 'state': { 'model': state.get_model_state_dict(), @@ -1171,16 +1083,8 @@ def _save_checkpoint( # Only some ranks are meant to save checkpoint and produce a file expect_file = False - # Save deepspeed checkpoint - if is_deepspeed: - expect_file = True - log.debug('Saving deepspeed checkpoints to %s...', save_filename) - if dist.get_global_rank() == 0: - _write_checkpoint_file(state_dict, save_filename) - - _save_deepspeed_model(state.deepspeed_model, save_filename) # Save sharded checkpoint - elif state.fsdp_sharded_state_dict_enabled: + if state.fsdp_sharded_state_dict_enabled: if state.fsdp_config is None: raise ValueError('Saving a sharded checkpoint requires passing an FSDP config to Trainer.') @@ -1269,24 +1173,6 @@ def _write_checkpoint_file(state_dict: dict[str, Any], filename: str) -> None: torch.save(state_dict, f) -def _save_deepspeed_model(model, filename: str): - """Save Deepspeed model and tarball the files.""" - write_mode = _get_write_mode(filename) - read_mode = 'r' + write_mode[1:] - - with tempfile.TemporaryDirectory() as tmpdir: - model.save_checkpoint(tmpdir, _DEEPSPEED_TAG) - - if os.path.exists(filename): - # extract to tmpdir to append below - # not all compression formats support direct append - with tarfile.open(filename, read_mode) as tar: - tar.extractall(tmpdir) - - with tarfile.open(filename, write_mode) as tar: - tar.add(tmpdir, arcname='') - - def save_checkpoint( state: State, filename: str = 'ep{epoch}-ba{batch}-rank{rank}', @@ -1316,20 +1202,13 @@ def save_checkpoint( * By default, only the rank zero process will save a checkpoint file. - * When using DeepSpeed, each rank will save a checkpoint file in tarball format. DeepSpeed - requires tarball format, as it saves model and optimizer states in separate files. - Ensure that ``'{{rank}}'`` appears within the ``filename``. Otherwise, multiple ranks - may attempt to write to the same file(s), leading to corrupted checkpoints. If no tarball file - extension is specified, ``.tar`` will be used. + * To write to compressed tar files, set the file extension to ``'.tar.gz'``, ``'.tgz'``, + ``'.tar.bz2'``, or ``'.tar.lzma'`` (depending on the desired compression algorithm). - * To write to compressed tar files (regardless of whether DeepSpeed is enabled), set the file - extension to ``'.tar.gz'``, ``'.tgz'``, ``'.tar.bz2'``, or ``'.tar.lzma'`` (depending on the - desired compression algorithm). - - * To write to compressed pt files (when DeepSpeed is disabled), set the file extension to - ``'.pt.bz2'``, ``'.pt.gz'``, ``'.pt.lz4'``, ``'.pt.lzma'``, ``'.pt.lzo'``, ``'.pt.xz'``, ``'.pt.zst'`` - (depending on the desired algorithm). You must have the corresponding CLI tool installed. - ``lz4`` is a good choice for a modest space saving while being very fast to compress. + * To write to compressed pt files, set the file extension to ``'.pt.bz2'``, ``'.pt.gz'``, + ``'.pt.lz4'``, ``'.pt.lzma'``, ``'.pt.lzo'``, ``'.pt.xz'``, ``'.pt.zst'`` (depending on the + desired algorithm). You must have the corresponding CLI tool installed. ``lz4`` is a good + choice for a modest space saving while being very fast to compress. .. warning:: @@ -1347,31 +1226,18 @@ def save_checkpoint( * The current epoch count is ``1``. * The current batch count is ``42``. - When DeepSpeed is not being used, the rank zero process will save the checkpoint to ``'ep1-ba42-rank0'``. - When DeepSpeed is being used, each rank (process) will save checkpoints to:: - - ep1-ba42-rank0.tar - ep1-ba42-rank1.tar - ep1-ba42-rank2.tar - ... + The rank zero process will save the checkpoint to ``'ep1-ba42-rank0'``. weights_only (bool, optional): If ``True``, save only the model weights instead of the entire training state. (default: ``False``) - .. note:: - - When using DeepSpeed, this parameter must be ``False``. Weights-only checkpointing is not currently - compatible with DeepSpeed, - Returns: list[pathlib.Path]: The list of checkpoint files saved, indexed by the rank of the process. .. note:: - When using DeepSpeed, each process (rank) saves its own checkpoint file. When doing multi-node training, the filepaths are valid only on each process's node; Composer does not move checkpoint files between nodes. - Otherwise, when not using DeepSpeed, each list will contain only one filepath, - since only the rank zero process saves checkpoints. + Each list will contain only one filepath since only the rank zero process saves checkpoints. """ diff --git a/composer/utils/inference.py b/composer/utils/inference.py index 354dbdddf5..e57d8ab0ba 100644 --- a/composer/utils/inference.py +++ b/composer/utils/inference.py @@ -22,7 +22,7 @@ from composer.utils.checkpoint import download_checkpoint, safe_torch_load from composer.utils.device import get_device from composer.utils.iter_helpers import ensure_tuple -from composer.utils.misc import is_model_ddp, is_model_deepspeed, is_model_fsdp, model_eval_mode +from composer.utils.misc import is_model_ddp, is_model_fsdp, model_eval_mode from composer.utils.object_store import ObjectStore from composer.utils.string_enum import StringEnum @@ -146,9 +146,6 @@ def export_for_inference( """ save_format = ExportFormat(save_format) - if is_model_deepspeed(model): - raise ValueError(f'Exporting for deepspeed models is currently not supported.') - if is_model_ddp(model): raise ValueError( f'Directly exporting a DistributedDataParallel model is not supported. Export the module instead.', diff --git a/composer/utils/misc.py b/composer/utils/misc.py index 84ab1becd2..56f044931b 100644 --- a/composer/utils/misc.py +++ b/composer/utils/misc.py @@ -21,7 +21,6 @@ from composer.core import Event, State, Time __all__ = [ - 'is_model_deepspeed', 'is_model_fsdp', 'is_notebook', 'warning_on_one_line', @@ -185,16 +184,6 @@ def check_interval(state: State, event: Event): return check_interval -def is_model_deepspeed(model: torch.nn.Module) -> bool: - """Whether ``model`` is an instance of a :class:`~deepspeed.DeepSpeedEngine`.""" - try: - import deepspeed - except ImportError: - return False - else: - return isinstance(model, deepspeed.DeepSpeedEngine) - - def is_model_ddp(model: torch.nn.Module) -> bool: """Whether ``model`` is an instance of a :class:`.DistributedDataParallel`.""" return isinstance(model, DistributedDataParallel) diff --git a/composer/utils/module_surgery.py b/composer/utils/module_surgery.py index b70fd793d1..4385c6d597 100644 --- a/composer/utils/module_surgery.py +++ b/composer/utils/module_surgery.py @@ -153,20 +153,7 @@ def replace_module_classes( `module.module` and re-wrap the `module.module` with `torch.nn.parallel.DistributedDataParallel`""", ), ) - try: - import deepspeed - except ImportError: - pass - else: - if isinstance(module, deepspeed.DeepSpeedEngine): - raise TypeError( - textwrap.dedent( - """\ - Surgery is not supported after a module is wrapped with - `deepspeed.DeepSpeedEngine` Instead, please perform surgery on the underlying module`, - and re-wrap it with `deepspeed.DeepSpeedEngine`""", - ), - ) + replaced_pairs = {} children_to_parents_and_names: OrderedDict[torch.nn.Module, list[tuple[torch.nn.Module, str]], diff --git a/docker/README.md b/docker/README.md index 41bd0e51b6..f0da230258 100644 --- a/docker/README.md +++ b/docker/README.md @@ -15,8 +15,8 @@ all dependencies for both NLP and Vision models. They are built on top of the | Composer Version | CUDA Support | Docker Tag | |--------------------|----------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| 0.27.0 | Yes | `mosaicml/composer:latest`, `mosaicml/composer:0.27.0` | -| 0.27.0 | No | `mosaicml/composer:latest_cpu`, `mosaicml/composer:0.27.0_cpu` | +| 0.28.0 | Yes | `mosaicml/composer:latest`, `mosaicml/composer:0.28.0` | +| 0.28.0 | No | `mosaicml/composer:latest_cpu`, `mosaicml/composer:0.28.0_cpu` | **Note**: For a lightweight installation, we recommended using a [MosaicML PyTorch Image](#pytorch-images) and manually diff --git a/docker/build_matrix.yaml b/docker/build_matrix.yaml index b062299eca..d79f12a1ec 100644 --- a/docker/build_matrix.yaml +++ b/docker/build_matrix.yaml @@ -168,9 +168,9 @@ TORCHVISION_VERSION: 0.18.1 - AWS_OFI_NCCL_VERSION: '' BASE_IMAGE: nvidia/cuda:12.4.1-cudnn-devel-ubuntu22.04 - COMPOSER_INSTALL_COMMAND: mosaicml[all]==0.27.0 + COMPOSER_INSTALL_COMMAND: mosaicml[all]==0.28.0 CUDA_VERSION: 12.4.1 - IMAGE_NAME: composer-0-27-0 + IMAGE_NAME: composer-0-28-0 MOFED_VERSION: latest-23.10 NVIDIA_REQUIRE_CUDA_OVERRIDE: '' PYTHON_VERSION: '3.11' @@ -178,17 +178,17 @@ PYTORCH_NIGHTLY_VERSION: '' PYTORCH_VERSION: 2.5.1 TAGS: - - mosaicml/composer:0.27.0 - - ghcr.io/databricks-mosaic/composer:0.27.0 + - mosaicml/composer:0.28.0 + - ghcr.io/databricks-mosaic/composer:0.28.0 - mosaicml/composer:latest - ghcr.io/databricks-mosaic/composer:latest TARGET: composer_stage TORCHVISION_VERSION: 0.20.1 - AWS_OFI_NCCL_VERSION: '' BASE_IMAGE: ubuntu:22.04 - COMPOSER_INSTALL_COMMAND: mosaicml[all]==0.27.0 + COMPOSER_INSTALL_COMMAND: mosaicml[all]==0.28.0 CUDA_VERSION: '' - IMAGE_NAME: composer-0-27-0-cpu + IMAGE_NAME: composer-0-28-0-cpu MOFED_VERSION: latest-23.10 NVIDIA_REQUIRE_CUDA_OVERRIDE: '' PYTHON_VERSION: '3.11' @@ -196,8 +196,8 @@ PYTORCH_NIGHTLY_VERSION: '' PYTORCH_VERSION: 2.5.1 TAGS: - - mosaicml/composer:0.27.0_cpu - - ghcr.io/databricks-mosaic/composer:0.27.0_cpu + - mosaicml/composer:0.28.0_cpu + - ghcr.io/databricks-mosaic/composer:0.28.0_cpu - mosaicml/composer:latest_cpu - ghcr.io/databricks-mosaic/composer:latest_cpu TARGET: composer_stage diff --git a/docker/generate_build_matrix.py b/docker/generate_build_matrix.py index 1fb476b351..1f45638c97 100644 --- a/docker/generate_build_matrix.py +++ b/docker/generate_build_matrix.py @@ -244,7 +244,7 @@ def _main(): composer_entries = [] # The `GIT_COMMIT` is a placeholder and Jenkins will substitute it with the actual git commit for the `composer_staging` images - composer_versions = ['0.27.0'] # Only build images for the latest composer version + composer_versions = ['0.28.0'] # Only build images for the latest composer version composer_python_versions = [PRODUCTION_PYTHON_VERSION] # just build composer against the latest for product in itertools.product(composer_python_versions, composer_versions, cuda_options): diff --git a/docs/source/doctest_fixtures.py b/docs/source/doctest_fixtures.py index 60dea99ff9..c390d63961 100644 --- a/docs/source/doctest_fixtures.py +++ b/docs/source/doctest_fixtures.py @@ -61,8 +61,6 @@ # Ignore certain warnings for doctest warnings.filterwarnings(action='ignore', message='.*Deterministic mode.*') # Expected warnings.filterwarnings(action='ignore', message='.*Some weights of Bert*') # Expected -warnings.filterwarnings(action='ignore', message='.*torch.cuda.amp.custom.*') # DeepSpeed -warnings.filterwarnings(action='ignore', message='.*The distutils.sysconfig module*') # DeepSpeed try: import wandb diff --git a/docs/source/getting_started/installation.rst b/docs/source/getting_started/installation.rst index 100247983a..36af48b3ab 100644 --- a/docs/source/getting_started/installation.rst +++ b/docs/source/getting_started/installation.rst @@ -18,7 +18,6 @@ the following installation targets are available: * ``pip install 'mosaicml[dev]'``: Installs development dependencies, which are required for running tests and building documentation. -* ``pip install 'mosaicml[deepspeed]'``: Installs Composer with support for :mod:`deepspeed`. * ``pip install 'mosaicml[nlp]'``: Installs Composer with support for NLP models and algorithms. * ``pip install 'mosaicml[wandb]'``: Installs Composer with support for :mod:`wandb`. * ``pip install 'mosaicml[comet_ml]'``: Installs Composer with support for :mod:`comet_ml`. diff --git a/docs/source/notes/distributed_training.rst b/docs/source/notes/distributed_training.rst index 9422e4280b..d930bd1fc3 100644 --- a/docs/source/notes/distributed_training.rst +++ b/docs/source/notes/distributed_training.rst @@ -17,8 +17,8 @@ greatly simplifies model building and memory management. Every GPU is performing the same work, so inspecting the rank zero is sufficient to reason about memory, performance, and other properties. -Within Composer, we have three options for data-parallelism-only -execution: `Pytorch DDP`_ (default), `Pytorch FSDP`_, and `DeepSpeed Zero`_. +Within Composer, we have two options for data-parallelism-only +execution: `Pytorch DDP`_ (default) and `Pytorch FSDP`_. Although Pytorch DDP is the default, Pytorch FSDP increases memory and computational efficiency when configured correctly while producing the same results and is the recommended option. @@ -121,53 +121,6 @@ DistributedSampler is not supported as IterableDatasets need to handle multi-wor training internally. See IterableDataset [docs](https://pytorch.org/docs/stable/data.html#torch.utils.data.IterableDataset) for more information -Deepspeed ---------- - -Composer comes with DeepSpeed support, allowing you to leverage their -full set of features that makes it easier to train large models across -(1) any type of GPU and (2) multiple nodes. For more details on DeepSpeed, -see `their website `__. - -We support optimizer and gradient sharing via -`Deepspeed Zero`_ stages 1 and 2 respectively. In the future, we'll support model -sharding via Zero-3. These methods reduce model state memory by a -factor of (1 / the number of data-parallel devices). - -To enable DeepSpeed, simply pass in a config as specified in the -DeepSpeed docs `here `__. - -.. code:: python - - # run_trainer.py - - from composer import Trainer - - trainer = Trainer( - model=model, - train_dataloader=train_dataloader, - eval_dataloader=eval_dataloader, - max_duration='160ep', - device='gpu', - deepspeed_config={ - "train_batch_size": 2048, - "fp16": {"enabled": True}, - }) - -Providing an empty dictionary to deepspeed is also valid. The deepspeed -defaults will be used and other fields (such as precision) will be inferred -from the trainer. - -.. warning:: - - The ``deepspeed_config`` must not conflict with any other parameters - passed to the trainer. - -.. warning:: - - Not all algorithms have been tested with Deepspeed, please proceed with - caution. - FullyShardedDataParallel (FSDP) ------------------------------- @@ -640,5 +593,4 @@ An example code snippet for using TP and FSDP with Composer is provided below: This is an experimental feature and is subject to change. Many features, such as `load_monolith_rank0_only` or tensor parallelism without FSDP, are not yet supported. .. _Pytorch DDP: https://pytorch.org/docs/master/generated/torch.nn.parallel.DistributedDataParallel.html -.. _Deepspeed Zero: https://www.deepspeed.ai/ .. _Pytorch FSDP: https://pytorch.org/docs/stable/fsdp.html diff --git a/docs/source/trainer/using_the_trainer.rst b/docs/source/trainer/using_the_trainer.rst index 325f9b95f4..4ad13ee01e 100644 --- a/docs/source/trainer/using_the_trainer.rst +++ b/docs/source/trainer/using_the_trainer.rst @@ -367,44 +367,6 @@ data parallel across 8 GPUs the dataloader should set ``batch_size=256``. Our :doc:`/notes/distributed_training` guide and the :mod:`composer.utils.dist` module. - -DeepSpeed Integration -~~~~~~~~~~~~~~~~~~~~~ - -Composer comes with DeepSpeed support, allowing you to leverage their -full set of features that makes it easier to train large models across -(1) any type of GPU and (2) multiple nodes. For more details on DeepSpeed, -see `their website `__. - -To enable DeepSpeed, simply pass in a config as specified in the -DeepSpeed docs `here `__. - -.. code:: python - - # run_trainer.py - - from composer import Trainer - - trainer = Trainer( - model=model, - train_dataloader=train_dataloader, - eval_dataloader=eval_dataloader, - max_duration='160ep', - device='gpu', - deepspeed_config={ - "train_batch_size": 2048, - "fp16": {"enabled": True}, - }) - -Providing an empty dictionary to DeepSpeed is also valid. The DeepSpeed -defaults will be used and other fields (such as precision) will be inferred -from the trainer. - -.. warning:: - - The ``deepspeed_config`` must not conflict with any other parameters - passed to the trainer. - FSDP Integration (beta) ~~~~~~~~~~~~~~~~~~~~~~~ diff --git a/pyproject.toml b/pyproject.toml index 76f6986c82..685e2450f4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -135,8 +135,7 @@ filterwarnings = [ 'ignore:distutils Version classes are deprecated:DeprecationWarning', # Ignore a UserWarning from TorchMetrics about potentially large memory usage when batch sizes are extremely large 'ignore:Metric `SpearmanCorrcoef` will save all targets and predictions in the buffer:UserWarning:torchmetrics', - # Ignore a UserWarning from torch 1.12 due to DeepSpeed's use of positional args - 'ignore:Positional args are being deprecated, use kwargs instead.*:UserWarning', + # Ignore a private function warnings for torch distributed collectives 'ignore:torch.distributed._all_gather_base is a private function and will be deprecated.*:UserWarning', 'ignore:torch.distributed._reduce_scatter_base is a private function and will be deprecated.*:UserWarning', # Ignore tensorboard deprecation warnings @@ -162,8 +161,6 @@ filterwarnings = [ '''ignore:Please use DTensor instead and we are deprecating ShardedTensor.:UserWarning''', # Ignore torch pytree deprecated warnings '''ignore:torch.utils._pytree._register_pytree_node is deprecated.*:UserWarning''', - # Ignore autograd kernel warning inside DeepSpeed - '''ignore:.*an autograd kernel was not registered to the Autograd key.*:UserWarning''', # Ignore save_state_dict / load_state_dict deprecation warnings '''ignore:'.*_state_dict' is deprecated and will be removed in future versions.*:UserWarning''', # Ignore mlflow warnings about transformers versions, @@ -174,10 +171,6 @@ filterwarnings = [ '''ignore::composer.utils.warnings.VersionedDeprecationWarning''', # Ignore deprecation warning for torch.load '''ignore:You are using `torch.load` with `weights_only=False`.*:FutureWarning''', - # Ignore deprecation warning as DeepSpeed uses old path - '''ignore:.*torch.cuda.amp.custom.*:FutureWarning''', - # DeepSpeed uses positional arguments for PyTorch, ignore the warning - '''ignore:.*Positional args are being deprecated, use kwargs instead.*:FutureWarning''', # PyTorch uses their own deprecated function '''ignore:.*FSDP.state_dict_type.*:FutureWarning''', # Ignore PyTorch NO_SHARD deprecation diff --git a/setup.py b/setup.py index 12c81f3987..2cb266f599 100644 --- a/setup.py +++ b/setup.py @@ -140,7 +140,7 @@ def package_files(prefix: str, directory: str, extension: str): 'GitPython==3.1.43', 'moto[s3]>=5.0.1,<6', 'mock-ssh-server==0.9.1', - 'cryptography==43.0.3', + 'cryptography==44.0.0', 'pytest-httpserver>=1.0.4,<1.1', 'setuptools<=59.5.0', ] @@ -153,12 +153,6 @@ def package_files(prefix: str, directory: str, extension: str): 'slack_sdk>=3.19.5,<4', } -extra_deps['deepspeed'] = [ - 'numpy<2', - 'deepspeed==0.8.3', - 'pydantic>=1.0,<2', -] - extra_deps['wandb'] = [ 'wandb>=0.13.2,<0.19', ] @@ -190,7 +184,7 @@ def package_files(prefix: str, directory: str, extension: str): ] extra_deps['sentencepiece'] = [ - 'protobuf<5.29', + 'protobuf<5.30', 'sentencepiece==0.2.0', ] diff --git a/tests/algorithms/test_algorithm_resumption.py b/tests/algorithms/test_algorithm_resumption.py index e0b14ec198..ccf3e7b2b6 100644 --- a/tests/algorithms/test_algorithm_resumption.py +++ b/tests/algorithms/test_algorithm_resumption.py @@ -28,94 +28,102 @@ def test_algorithm_resumption( alg_cls: type[Algorithm], world_size, ): - folder1 = os.path.join(tmp_path, 'folder1') - folder2 = os.path.join(tmp_path, 'folder2') - os.makedirs(folder1, exist_ok=True) - os.makedirs(folder2, exist_ok=True) - - model = get_alg_model(alg_cls) - alg_kwargs = get_alg_kwargs(alg_cls) - - copied_model = copy.deepcopy(model) # copy the model so the params will start from the same point - - if alg_cls is LayerFreezing: - pytest.xfail('Known issues') - - if alg_cls in (SAM, StochasticDepth): - pytest.xfail('Mismatch in weights when resuming from a checkpoint.') - - if alg_cls is GyroDropout: - pytest.xfail('GyroDropoutLayer is not implemented in a way that allows correct resumption.') - - if alg_cls is SWA and world_size > 1: - pytest.xfail('SWA is not implemented in a way that is compatible correct resumption on multiple devices.') - - optimizer = torch.optim.Adam(model.parameters(), lr=0.01) - scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=5) - - shared_config = { - 'max_duration': '2ep', - 'save_filename': 'ep{epoch}-rank{rank}', - 'save_interval': '1ep', - 'train_subset_num_batches': 2, - 'precision': 'amp_bf16', - } - train_dataloader = get_alg_dataloader(alg_cls) if world_size == 1 else get_alg_dataloader(alg_cls, multigpu=True) - # train model once, saving checkpoints every epoch - trainer1 = Trainer( - model=model, - train_dataloader=train_dataloader, - optimizers=optimizer, - schedulers=scheduler, - save_folder=folder1, - algorithms=alg_cls(**alg_kwargs), - **shared_config, - ) - trainer1.fit() - - # create second trainer, load an intermediate checkpoint - # and continue training - - optimizer = torch.optim.Adam(copied_model.parameters(), lr=0.01) - scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=5) - - alg = alg_cls(**alg_kwargs) - # SeqLengthWarmup has a call to ._activate_model() that happens on the first call to the algorithm - # in order to get complete matching of the rng state, we have to cause that extra call to be skipped - # when reloading. - if alg_cls is SeqLengthWarmup: - alg._activated = True # type: ignore - - train_dataloader = get_alg_dataloader(alg_cls) if world_size == 1 else get_alg_dataloader(alg_cls, multigpu=True) - trainer2 = Trainer( - model=copied_model, - train_dataloader=train_dataloader, - load_path=os.path.join(folder1, 'ep1-rank{rank}'), - load_weights_only=False, - load_strict_model_weights=False, - optimizers=optimizer, - schedulers=scheduler, - save_folder=folder2, - algorithms=alg, - **shared_config, - ) - trainer2.fit() - # check that the checkpoints are equal - if world_size == 1 or dist.get_global_rank() == 0: - _assert_checkpoints_equal( - file1=os.path.join(folder1, 'ep2-rank0'), - file2=os.path.join(folder2, 'ep2-rank0'), + # Use RAM-based tmp directory instead of disk + from tempfile import TemporaryDirectory + with TemporaryDirectory() as tmpdir: + folder1 = os.path.join(tmpdir, 'folder1') + folder2 = os.path.join(tmpdir, 'folder2') + os.makedirs(folder1, exist_ok=True) + os.makedirs(folder2, exist_ok=True) + + if alg_cls is LayerFreezing: + pytest.xfail('Known issues') + + if alg_cls in (SAM, StochasticDepth): + pytest.xfail('Mismatch in weights when resuming from a checkpoint.') + + if alg_cls is GyroDropout: + pytest.xfail('GyroDropoutLayer is not implemented in a way that allows correct resumption.') + + if alg_cls is SWA and world_size > 1: + pytest.xfail('SWA is not implemented in a way that is compatible correct resumption on multiple devices.') + + model = get_alg_model(alg_cls) + alg_kwargs = get_alg_kwargs(alg_cls) + + copied_model = copy.deepcopy(model) # copy the model so the params will start from the same point + + optimizer = torch.optim.Adam(model.parameters(), lr=0.01) + scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=1) + + # Reduce training duration and data + shared_config = { + 'max_duration': '2ba', + 'save_filename': 'checkpoint_ba{batch}-rank{rank}', + 'save_interval': '1ba', + 'train_subset_num_batches': 2, + 'precision': 'amp_bf16', + } + train_dataloader = get_alg_dataloader( + alg_cls, + ) if world_size == 1 else get_alg_dataloader(alg_cls, multigpu=True) + # train model once, saving checkpoints every epoch + trainer1 = Trainer( + model=model, + train_dataloader=train_dataloader, + optimizers=optimizer, + schedulers=scheduler, + save_folder=folder1, + algorithms=alg_cls(**alg_kwargs), + **shared_config, ) + trainer1.fit() + + # create second trainer, load an intermediate checkpoint + # and continue training + + optimizer = torch.optim.Adam(copied_model.parameters(), lr=0.1) + scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=1) + + alg = alg_cls(**alg_kwargs) + # SeqLengthWarmup has a call to ._activate_model() that happens on the first call to the algorithm + # in order to get complete matching of the rng state, we have to cause that extra call to be skipped + # when reloading. + if alg_cls is SeqLengthWarmup: + alg._activated = True # type: ignore + train_dataloader = get_alg_dataloader( + alg_cls, + ) if world_size == 1 else get_alg_dataloader(alg_cls, multigpu=True) + + trainer2 = Trainer( + model=copied_model, + train_dataloader=train_dataloader, + load_path=os.path.join(folder1, 'checkpoint_ba1-rank{rank}'), + load_weights_only=False, + load_strict_model_weights=False, + optimizers=optimizer, + schedulers=scheduler, + save_folder=folder2, + algorithms=alg, + **shared_config, + ) + trainer2.fit() - # check that different epoch checkpoints are _not_ equal - # this ensures that the model weights are being updated. - if world_size == 1 or dist.get_global_rank() == 0: - with pytest.raises(AssertionError): - _assert_model_weights_equal( - file1=os.path.join(folder1, 'ep1-rank0'), - file2=os.path.join(folder1, 'ep2-rank0'), + # check that the checkpoints are equal + if world_size == 1 or dist.get_global_rank() == 0: + _assert_checkpoints_equal( + os.path.join(folder1, 'checkpoint_ba2-rank0'), + os.path.join(folder2, 'checkpoint_ba2-rank0'), ) + # check that different epoch checkpoints are _not_ equal + # this ensures that the model weights are being updated. + with pytest.raises(AssertionError): + _assert_model_weights_equal( + os.path.join(folder1, 'checkpoint_ba1-rank0'), + os.path.join(folder1, 'checkpoint_ba2-rank0'), + ) + def _assert_checkpoints_equal(file1, file2): # TODO: consider merging with _assert_checkpoints_equivalent diff --git a/tests/algorithms/test_algorithms_train.py b/tests/algorithms/test_algorithms_train.py index a73a649b70..0f9cb32de2 100644 --- a/tests/algorithms/test_algorithms_train.py +++ b/tests/algorithms/test_algorithms_train.py @@ -18,7 +18,7 @@ def test_algorithm_trains(alg_cls: type[Algorithm]): trainer = Trainer( model=model, train_dataloader=dataloader, - max_duration='2ep', + max_duration='2ba', algorithms=alg_cls(**alg_kwargs), ) trainer.fit() @@ -34,5 +34,5 @@ def test_algorithm_trains(alg_cls: type[Algorithm]): 'GyroDropout is implemented to be applied on Event.FIT_START, so is not compatible with multiple calls to fit.', ) - # fit again for another epoch - trainer.fit(duration='1ep') + # fit again for another batch + trainer.fit(duration='1ba') diff --git a/tests/algorithms/test_gradient_clipping.py b/tests/algorithms/test_gradient_clipping.py index d749362801..1b1336f598 100644 --- a/tests/algorithms/test_gradient_clipping.py +++ b/tests/algorithms/test_gradient_clipping.py @@ -20,7 +20,7 @@ def simple_model_with_grads(): # Set up small NN with one linear layer with no bias + softmax, so only # one set of params and get some gradients. - N, hin, num_classes = 8, 4, 3 + N, hin, num_classes = 4, 2, 2 x = torch.rand((N, hin)) y = torch.randint(high=num_classes - 1, size=(N,)) model = nn.Sequential(nn.Linear(hin, num_classes, bias=False), nn.Softmax(dim=1)) @@ -47,8 +47,6 @@ def __init__(self, n_ch, num_fmaps, h, num_classes, filter_size): self.mlp = nn.Sequential( nn.Linear(num_fmaps, h), nn.ReLU(), - nn.Linear(h, h), - nn.ReLU(), nn.Linear(h, num_classes), nn.Softmax(dim=1), ) @@ -60,8 +58,8 @@ def forward(self, x): return out # Generate some gradients. - N, n_ch, num_fmaps, h, num_classes, filter_size = 8, 3, 4, 4, 3, 3 - x = torch.rand((N, n_ch, 16, 16)) + N, n_ch, num_fmaps, h, num_classes, filter_size = 4, 1, 2, 2, 2, 2 + x = torch.rand((N, n_ch, 8, 8)) y = torch.randint(high=num_classes - 1, size=(N,)) model = myNN(n_ch, num_fmaps, h, num_classes, filter_size) @@ -160,50 +158,6 @@ def test_gradient_clipping_algorithm(monkeypatch, clipping_type, model_with_grad apply_gc_fn.assert_called_once() -@pytest.mark.parametrize( - 'model_with_grads', - [ - simple_model_with_grads(), - cnn_model_with_grads(), - simple_transformer_model_with_grads(), - hf_model_with_grads(), - ], -) -def test_gradient_clipping_algorithm_with_deepspeed_enabled( - monkeypatch: pytest.MonkeyPatch, - model_with_grads, - dummy_state: State, -): - clipping_threshold = 0.1191 - apply_gc_fn = Mock() - monkeypatch.setattr(gc_module, 'apply_gradient_clipping', apply_gc_fn) - state = dummy_state - - # Set clipping_type to norm to ensure that apply_gradient_clipping - # is not called. - state.algorithms = [GradientClipping(clipping_type='norm', clipping_threshold=clipping_threshold)] - - # Enable deepspeed. - state.deepspeed_config = {} - - model = model_with_grads - state.model = model - logger = Mock() - engine = Engine(state, logger) - - # Run the Event that should cause gradient_clipping.apply to be called and deepspeed_config to be modified. - engine.run_event(Event.INIT) - - # Make sure deepspeed_config's gradient_clipping field is set properly. - assert ( - 'gradient_clipping' in state.deepspeed_config and - state.deepspeed_config['gradient_clipping'] == clipping_threshold - ) - - # Make sure apply_gradient_clipping is not called. - apply_gc_fn.assert_not_called() - - def _auto_wrap_policy(module: torch.nn.Module, recurse: bool, nonwrapped_numel: int) -> bool: if recurse: return True @@ -261,40 +215,6 @@ def test_gradient_clipping_algorithm_with_fsdp_enabled_does_not_error( engine.run_event(Event.AFTER_TRAIN_BATCH) -@pytest.mark.parametrize( - 'model_with_grads', - [simple_model_with_grads, cnn_model_with_grads, simple_transformer_model_with_grads, hf_model_with_grads], -) -def test_algorithm_with_deepspeed_enabled_errors_out_for_non_norm( - monkeypatch: pytest.MonkeyPatch, - dummy_state: State, - model_with_grads, -): - clipping_threshold = 0.1191 - apply_gc_fn = Mock() - monkeypatch.setattr(gc_module, 'apply_gradient_clipping', apply_gc_fn) - state = dummy_state - - # Enable deepspeed and set clipping_type to norm to ensure that apply_gradient_clipping - # is not called. - state.algorithms = [GradientClipping(clipping_type='value', clipping_threshold=clipping_threshold)] - state.deepspeed_config = {} - - model = model_with_grads() - state.model = model - logger = Mock() - engine = Engine(state, logger) - - # Clipping type is not set to norm and deepspeed is enabled so NotImplementedError should be raised. - with pytest.raises(NotImplementedError): - engine.run_event(Event.INIT) - - # Clipping threshold is less than zero and deepspeed is enabled so NotImplementedError should be raised. - state.algorithms = [GradientClipping(clipping_type='norm', clipping_threshold=-2.0)] - with pytest.raises(ValueError): - engine.run_event(Event.INIT) - - #### Tests Specific to AGC ###### diff --git a/tests/algorithms/test_sam.py b/tests/algorithms/test_sam.py index 11e386cedc..30a7333525 100644 --- a/tests/algorithms/test_sam.py +++ b/tests/algorithms/test_sam.py @@ -59,11 +59,9 @@ def dict_loss(outputs, targets, *args, **kwargs): @pytest.mark.filterwarnings('ignore::UserWarning') class TestSAMParamGroups(): - @pytest.fixture(params=['FSDP', 'DeepSpeed']) + @pytest.fixture def config(self, request): - distributed_mode = request.param - train_dataset = RandomClassificationDataset(size=16) scheduler = CosineAnnealingWithWarmupScheduler( @@ -90,20 +88,14 @@ def config(self, request): 'amp_bf16', 'parallelism_config': None, - 'deepspeed_config': - None, } - if distributed_mode == 'FSDP': - config_dict['parallelism_config'] = {'fsdp': {'sharding_strategy': 'NO_SHARD'}} - else: - config_dict['deepspeed_config'] = {'prescale_gradients': True} + config_dict['parallelism_config'] = {'fsdp': {'sharding_strategy': 'NO_SHARD'}} # Simulate world_size checking as in LLMFoundry. See: # * https://github.com/mosaicml/llm-foundry/blob/bfbb8c57053eaa3cb99a5d51ba602d1a6c872aa7/scripts/train/train.py#L519-L523 - if dist.get_world_size( - ) == 1 and (config_dict['parallelism_config'] is not None or config_dict['deepspeed_config'] is not None): - config_dict['parallelism_config'] = config_dict['deepspeed_config'] = None + if dist.get_world_size() == 1: + config_dict['parallelism_config'] = None return config_dict diff --git a/tests/common/state.py b/tests/common/state.py index de6948c851..77783fe0d0 100644 --- a/tests/common/state.py +++ b/tests/common/state.py @@ -4,7 +4,6 @@ from typing import Any from composer.core import State -from composer.utils import is_model_deepspeed from tests.common.compare import deep_compare @@ -18,7 +17,6 @@ def _del_wct_timestamp_fields(timestamp_state_dict: dict[str, Any]): def assert_state_equivalent(state1: State, state2: State): """Assert that ``state1`` is equivalent to ``state2``, ignoring wall clock timestamp fields.""" assert state1.serialized_attributes == state2.serialized_attributes - assert is_model_deepspeed(state1.model) == is_model_deepspeed(state2.model) # Using a loose tolerance for GPU states as GPU determinism does not work properly is_gpu = next(state1.model.parameters()).device.type == 'cuda' diff --git a/tests/test_events.py b/tests/test_events.py index 235d0941f1..d6407df2d7 100644 --- a/tests/test_events.py +++ b/tests/test_events.py @@ -1,7 +1,8 @@ -# Copyright 2022 MosaicML Composer authors +# Copyright 2024 MosaicML Composer authors # SPDX-License-Identifier: Apache-2.0 import math +from unittest.mock import patch import pytest import torch @@ -22,27 +23,31 @@ def test_event_values(event: Event): class TestEventCalls: - eval_subset_num_batches = 2 - train_subset_num_batches = 2 + eval_subset_num_batches = 1 + train_subset_num_batches = 1 - def get_trainer(self, precision='fp32', **kwargs): + def get_trainer(self, precision='fp32', max_duration='1ep', save_interval='1ep', **kwargs): model = SimpleModel() optimizer = torch.optim.Adam(model.parameters()) - train_dataset = RandomClassificationDataset() - eval_dataset = RandomClassificationDataset() + train_dataset = RandomClassificationDataset(size=16) + eval_dataset = RandomClassificationDataset(size=16) train_batch_size = 4 evaluator1 = DataLoader( dataset=eval_dataset, batch_size=8, sampler=dist.get_sampler(eval_dataset), + num_workers=0, + drop_last=True, ) evaluator2 = DataLoader( dataset=eval_dataset, batch_size=4, sampler=dist.get_sampler(eval_dataset), + num_workers=0, + drop_last=True, ) return Trainer( @@ -51,13 +56,15 @@ def get_trainer(self, precision='fp32', **kwargs): dataset=train_dataset, batch_size=train_batch_size, sampler=dist.get_sampler(train_dataset), + num_workers=0, ), eval_dataloader=(evaluator1, evaluator2), device_train_microbatch_size=train_batch_size // 2, precision=precision, train_subset_num_batches=self.train_subset_num_batches, eval_subset_num_batches=self.eval_subset_num_batches, - max_duration='2ep', + max_duration=max_duration, + save_interval=save_interval, optimizers=optimizer, callbacks=[EventCounterCallback()], **kwargs, @@ -71,13 +78,12 @@ def get_trainer(self, precision='fp32', **kwargs): ], ) @pytest.mark.parametrize( - 'device,deepspeed_zero_stage,use_fsdp,precision', + 'device,use_fsdp,precision', [ - pytest.param('cpu', None, False, 'fp32', id='cpu-ddp'), + pytest.param('cpu', False, 'fp32', id='cpu-ddp'), # TODO: Remove filterwarnings after FSDP remove deprecated code pytest.param( 'gpu', - True, False, 'fp32', id='gpu-ddp', @@ -88,7 +94,6 @@ def get_trainer(self, precision='fp32', **kwargs): ), pytest.param( 'gpu', - None, True, 'amp_fp16', id='gpu-fsdp', @@ -100,12 +105,39 @@ def get_trainer(self, precision='fp32', **kwargs): ], ) @pytest.mark.parametrize('save_interval', ['1ep', '1ba']) - def test_event_calls(self, world_size, device, deepspeed_zero_stage, use_fsdp, precision, save_interval): - save_interval = Time.from_timestring(save_interval) - - deepspeed_config = None - if deepspeed_zero_stage: - deepspeed_config = {'zero_optimization': {'stage': deepspeed_zero_stage}} + def test_event_calls(self, world_size, device, use_fsdp, precision, save_interval): + # handle 1ba save interval separately to optimize speed + if save_interval == '1ba': + # mock the save_checkpoint method to speed up batch saves + with patch('composer.trainer.trainer.Trainer.save_checkpoint') as mock_save: + mock_save.return_value = None + self._run_event_calls_test( + world_size, + device, + use_fsdp, + precision, + save_interval, + num_epochs=1, + ) + else: + self._run_event_calls_test( + world_size, + device, + use_fsdp, + precision, + save_interval, + num_epochs=1, + ) + + def _run_event_calls_test( + self, + world_size, + device, + use_fsdp, + precision, + save_interval, + num_epochs, + ): parallelism_config = None if use_fsdp: @@ -120,14 +152,13 @@ def test_event_calls(self, world_size, device, deepspeed_zero_stage, use_fsdp, p trainer = self.get_trainer( precision=precision, device=device, - deepspeed_config=deepspeed_config, parallelism_config=parallelism_config, save_interval=save_interval, - eval_interval=save_interval, + eval_interval=Time.from_timestring(save_interval), ) trainer.fit() - self._assert_expected_event_calls(trainer, save_interval, num_epochs=2) + self._assert_expected_event_calls(trainer, Time.from_timestring(save_interval), num_epochs=num_epochs) def _assert_expected_event_calls(self, trainer: Trainer, eval_interval: Time, num_epochs: int): state = trainer.state diff --git a/tests/test_full_nlp.py b/tests/test_full_nlp.py index 14380b38fe..7afb8cc08c 100644 --- a/tests/test_full_nlp.py +++ b/tests/test_full_nlp.py @@ -9,6 +9,7 @@ from packaging import version from torch.utils.data import DataLoader from torchmetrics.classification import MulticlassAccuracy +from transformers import BertConfig, BertForMaskedLM, BertForSequenceClassification, BertTokenizerFast from composer.algorithms import GatedLinearUnits from composer.loggers import RemoteUploaderDownloader @@ -35,22 +36,22 @@ def pretraining_test_helper(tokenizer, model, algorithms, tmp_path, device): pretraining_model_copy = copy.deepcopy(model) pretraining_train_dataset = RandomTextLMDataset( - size=8, + size=4, vocab_size=tokenizer.vocab_size, - sequence_length=4, + sequence_length=2, use_keys=True, ) collator = transformers.DataCollatorForLanguageModeling(tokenizer=tokenizer, mlm_probability=0.15) pretraining_train_dataloader = DataLoader( pretraining_train_dataset, - batch_size=4, + batch_size=2, sampler=dist.get_sampler(pretraining_train_dataset), collate_fn=collator, ) pretraining_eval_dataloader = DataLoader( pretraining_train_dataset, - batch_size=4, + batch_size=2, sampler=dist.get_sampler(pretraining_train_dataset), collate_fn=collator, ) @@ -59,7 +60,7 @@ def pretraining_test_helper(tokenizer, model, algorithms, tmp_path, device): model=pretraining_model_copy, train_dataloader=pretraining_train_dataloader, save_folder=str(tmp_path / 'pretraining_checkpoints'), - max_duration='1ep', + max_duration='2ba', seed=17, algorithms=algorithms, device=device, @@ -91,20 +92,20 @@ def finetuning_test_helper(tokenizer, model, algorithms, checkpoint_path, pretra finetuning_model_copy = copy.deepcopy(model) finetuning_train_dataset = RandomTextClassificationDataset( - size=8, + size=4, vocab_size=tokenizer.vocab_size, - sequence_length=4, + sequence_length=2, num_classes=3, use_keys=isinstance(model, HuggingFaceModel), ) finetuning_train_dataloader = DataLoader( finetuning_train_dataset, - batch_size=4, + batch_size=2, sampler=dist.get_sampler(finetuning_train_dataset), ) finetuning_eval_dataloader = DataLoader( finetuning_train_dataset, - batch_size=4, + batch_size=2, sampler=dist.get_sampler(finetuning_train_dataset), ) @@ -137,7 +138,7 @@ def finetuning_test_helper(tokenizer, model, algorithms, checkpoint_path, pretra load_weights_only=True, load_strict_model_weights=False, loggers=[rud], - max_duration='1ep', + max_duration='2ba', seed=17, algorithms=algorithms, device=device, @@ -229,7 +230,6 @@ def inference_test_helper( @device('cpu', 'gpu') -# Note: the specificity of these settings are due to incompatibilities (e.g. the simpletransformer model is not traceable) @pytest.mark.parametrize( 'model_type,algorithms,save_format', [ @@ -242,10 +242,8 @@ def test_full_nlp_pipeline( model_type, algorithms, save_format, - tiny_bert_tokenizer, onnx_opset_version, tmp_path, - request, device, ): """This test is intended to exercise our full pipeline for NLP. @@ -260,29 +258,33 @@ def test_full_nlp_pipeline( pytest.skip("Don't test prior PyTorch version's default Opset version.") algorithms = [algorithm() for algorithm in algorithms] - device = get_device(device) - - tiny_bert_model = None - if model_type == 'tinybert_hf': - tiny_bert_model = request.getfixturevalue('tiny_bert_model') - - # pretraining + config = None + tokenizer = BertTokenizerFast.from_pretrained('bert-base-uncased', model_max_length=128) if model_type == 'tinybert_hf': - assert tiny_bert_model is not None + # Updated minimal BERT configuration + config = BertConfig( + vocab_size=30522, + hidden_size=16, + num_hidden_layers=2, + num_attention_heads=2, + intermediate_size=64, + num_labels=3, + ) + tiny_bert_model = BertForMaskedLM(config) pretraining_metrics = [LanguageCrossEntropy(ignore_index=-100), MaskedAccuracy(ignore_index=-100)] pretraining_model = HuggingFaceModel( tiny_bert_model, - tiny_bert_tokenizer, + tokenizer, use_logits=True, metrics=pretraining_metrics, ) elif model_type == 'simpletransformer': - pretraining_model = SimpleTransformerMaskedLM(vocab_size=tiny_bert_tokenizer.vocab_size) + pretraining_model = SimpleTransformerMaskedLM(vocab_size=30522) else: raise ValueError('Unsupported model type') pretraining_output_path = pretraining_test_helper( - tiny_bert_tokenizer, + tokenizer, pretraining_model, algorithms, tmp_path, @@ -292,25 +294,23 @@ def test_full_nlp_pipeline( # finetuning if model_type == 'tinybert_hf': finetuning_metric = MulticlassAccuracy(num_classes=3, average='micro') - hf_finetuning_model, _ = HuggingFaceModel.hf_from_composer_checkpoint( - pretraining_output_path, - model_instantiation_class='transformers.AutoModelForSequenceClassification', - model_config_kwargs={'num_labels': 3}, - ) finetuning_model = HuggingFaceModel( - model=hf_finetuning_model, - tokenizer=tiny_bert_tokenizer, + model=BertForSequenceClassification(config), + tokenizer=tokenizer, use_logits=True, metrics=[finetuning_metric], ) elif model_type == 'simpletransformer': - finetuning_model = SimpleTransformerClassifier(vocab_size=tiny_bert_tokenizer.vocab_size, num_classes=3) + finetuning_model = SimpleTransformerClassifier( + vocab_size=30522, + num_classes=3, + ) else: raise ValueError('Unsupported model type.') finetuning_model_copy = copy.deepcopy(finetuning_model) finetuning_trainer, finetuning_dataloader, rud, finetuning_output_path = finetuning_test_helper( - tiny_bert_tokenizer, + tokenizer, finetuning_model, algorithms, pretraining_output_path, diff --git a/tests/trainer/test_checkpoint.py b/tests/trainer/test_checkpoint.py index 8ae247fabf..9aaad137bd 100644 --- a/tests/trainer/test_checkpoint.py +++ b/tests/trainer/test_checkpoint.py @@ -120,15 +120,14 @@ def _assert_checkpoints_equivalent(file1, file2, atol=0.0, rtol=0.0): deep_compare(checkpoint_1, checkpoint_2, atol=atol, rtol=rtol) - # deepspeed checkpoints do not have model or optimizer - # so either model, optimizer should be in all checkpoints or in none + # Model and optimizer should be in all checkpoints keys_in = ( 'model' in checkpoint_1['state'], 'optimizers' in checkpoint_1['state'], 'model' in checkpoint_2['state'], 'optimizers' in checkpoint_2['state'], ) - assert all(keys_in) or not any(keys_in) + assert all(keys_in) @pytest.mark.parametrize( @@ -1543,13 +1542,10 @@ def __len__(self) -> int: ], ) @pytest.mark.parametrize( - 'device,deepspeed_zero_stage', + 'device', [ - pytest.param('cpu', None, id='cpu-ddp'), - pytest.param('gpu', None, id='gpu-ddp', marks=pytest.mark.gpu), - pytest.param('gpu', 0, id='deepspeed-zero0', marks=pytest.mark.gpu), - pytest.param('gpu', 1, id='deepspeed-zero1', marks=pytest.mark.gpu), - pytest.param('gpu', 2, id='deepspeed-zero2', marks=pytest.mark.gpu), + pytest.param('cpu', id='cpu-ddp'), + pytest.param('gpu', id='gpu-ddp', marks=pytest.mark.gpu), ], ) @pytest.mark.parametrize( @@ -1600,7 +1596,6 @@ def test_resumption( self, device: str, world_size: int, - deepspeed_zero_stage: Optional[int], save_interval: str, save_filename: str, resume_file: str, @@ -1613,23 +1608,11 @@ def test_resumption( tmp_paths = dist.all_gather_object(os.path.abspath(tmp_path)) save_folder = pathlib.Path(tmp_paths[0]) - if deepspeed_zero_stage: - deepspeed_config = {'zero_optimization': {'stage': deepspeed_zero_stage}} - - # save_checkpoint appends .tar for deepspeed - if not is_tar(resume_file): - resume_file += '.tar' - if not is_tar(final_checkpoint): - final_checkpoint += '.tar' - else: - deepspeed_config = None - trainer_1 = self.get_trainer( save_folder=os.path.join(save_folder, 'first'), save_filename=save_filename, save_interval=save_interval, eval_interval=save_interval, - deepspeed_config=deepspeed_config, seed=seed, device=device, ) @@ -1642,12 +1625,10 @@ def test_resumption( save_interval=save_interval, num_epochs=2, # set in get_trainer() num_batches_per_epoch=5, # set in get_trainer() - is_deepspeed=deepspeed_config is not None, ) - if not deepspeed_config: - # for DDP training, only rank 0 saves - resume_file = resume_file.format(rank=0) + # for DDP training, only rank 0 saves + resume_file = resume_file.format(rank=0) resume_file = os.path.join(save_folder, 'first', resume_file) @@ -1656,7 +1637,6 @@ def test_resumption( save_filename=save_filename, save_interval=save_interval, eval_interval=save_interval, - deepspeed_config=deepspeed_config, seed=seed, device=device, load_path=resume_file, # <-- resume training from file @@ -1797,7 +1777,6 @@ def _assert_expected_num_checkpoints( save_interval: str, num_epochs: int, num_batches_per_epoch: int, - is_deepspeed: bool, ): interval = Time.from_timestring(save_interval) if interval.unit == TimeUnit.EPOCH: @@ -1806,10 +1785,6 @@ def _assert_expected_num_checkpoints( expected_num_files = ((num_batches_per_epoch * num_epochs - 1) // interval.value) + 1 expected_num_files += 1 # account for symlink - if is_deepspeed: - # each rank saves - expected_num_files *= dist.get_world_size() - files = os.listdir(save_folder) assert len(files) == expected_num_files @@ -1823,20 +1798,15 @@ def _assert_expected_num_checkpoints( ) @pytest.mark.parametrize('num_keep', list(range(-1, 5))) @pytest.mark.parametrize( - 'device,deepspeed_enabled,zero_stage', + 'device', [ - pytest.param('cpu', False, None, id='cpu-ddp'), - pytest.param('gpu', False, None, id='gpu-ddp', marks=pytest.mark.gpu), - pytest.param('gpu', True, 0, id='deepspeed-zero0', marks=pytest.mark.gpu), - pytest.param('gpu', True, 1, id='deepspeed-zero1', marks=pytest.mark.gpu), - pytest.param('gpu', True, 2, id='deepspeed-zero2', marks=pytest.mark.gpu), + pytest.param('cpu', id='cpu-ddp'), + pytest.param('gpu', id='gpu-ddp', marks=pytest.mark.gpu), ], ) def test_rotate_checkpoints( world_size, device, - deepspeed_enabled, - zero_stage, num_keep, tmp_path: pathlib.Path, ): @@ -1844,10 +1814,6 @@ def test_rotate_checkpoints( tmp_paths = dist.all_gather_object(os.path.abspath(tmp_path)) save_folder = tmp_paths[0] - deepseed_config = None - if deepspeed_enabled: - deepseed_config = {'zero_optimization': {'stage': zero_stage}} - train_dataset = RandomImageDataset() trainer = Trainer( @@ -1863,22 +1829,21 @@ def test_rotate_checkpoints( max_duration='6ba', save_num_checkpoints_to_keep=num_keep, device=device, - deepspeed_config=deepseed_config, ) trainer.fit() dist.barrier() # ensure all checkpoints rotated across ranks - # deepspeed saves 1 file per rank + # Only rank 0 saves files total_checkpoints = 6 num_keep = num_keep if num_keep >= 0 else total_checkpoints - expected_num = num_keep if not deepspeed_enabled else num_keep * world_size + expected_num = num_keep files = glob(os.path.join(save_folder, 'checkpoint_*')) symlink_files = glob(os.path.join(save_folder, 'latest-rank*')) assert len(files) == expected_num - assert len(symlink_files) == ((1 if not deepspeed_enabled else world_size) if num_keep != 0 else 0) + assert len(symlink_files) == (1 if num_keep != 0 else 0) dist.barrier() # all ranks finish before cleaning up tmpdir diff --git a/tests/trainer/test_ddp.py b/tests/trainer/test_ddp.py index 9241482456..348a9aeb24 100644 --- a/tests/trainer/test_ddp.py +++ b/tests/trainer/test_ddp.py @@ -80,15 +80,12 @@ def run_event(self, event: Event, state: State, logger: Logger) -> None: @pytest.mark.parametrize( - 'device,deepspeed,fsdp', + 'device,fsdp', [ - pytest.param('cpu', False, False, id='cpu'), - pytest.param('gpu', False, False, id='gpu', marks=pytest.mark.gpu), - # TODO: Remove filterwarnings after FSDP removes deprecated code - pytest.param('gpu', True, False, id='deepspeed', marks=pytest.mark.gpu), + pytest.param('cpu', False, id='cpu'), + pytest.param('gpu', False, id='gpu', marks=pytest.mark.gpu), pytest.param( 'gpu', - False, True, id='fsdp', marks=[ @@ -105,7 +102,7 @@ def run_event(self, event: Event, state: State, logger: Logger) -> None: pytest.param(2, marks=pytest.mark.world_size(2)), ], ) -def test_ddp(device: str, world_size: int, deepspeed: bool, fsdp: bool, tmp_path: pathlib.Path) -> None: +def test_ddp(device: str, world_size: int, fsdp: bool, tmp_path: pathlib.Path) -> None: """test strategy for ddp: 1) Train a dummy model on two gps, for two epochs, using the tracked dataset. 2) The tracked dataset should record two -- and only two -- accesses for each sample -- one for each epoch If each sample is accessed more than this number of times, then the distributed sampler isn't working properly If each sample is @@ -184,7 +181,6 @@ def test_ddp(device: str, world_size: int, deepspeed: bool, fsdp: bool, tmp_path eval_interval='1ep', eval_subset_num_batches=eval_subset_num_batches, train_subset_num_batches=train_subset_num_batches, - deepspeed_config={} if deepspeed else None, parallelism_config=parallelism_config, callbacks=[CheckBatch0(tmp_path)], ) @@ -203,9 +199,8 @@ def test_ddp(device: str, world_size: int, deepspeed: bool, fsdp: bool, tmp_path assert expected_train_samples == actual_train_samples assert expected_val_samples == actual_val_samples - if not deepspeed: - _assert_inputs_different(tmp_path, max_epochs, is_train=True) - _assert_inputs_different(tmp_path, max_epochs, is_train=False) + _assert_inputs_different(tmp_path, max_epochs, is_train=True) + _assert_inputs_different(tmp_path, max_epochs, is_train=False) def _read_tracked_results(path, is_train): diff --git a/tests/trainer/test_fsdp_checkpoint.py b/tests/trainer/test_fsdp_checkpoint.py index 332ed5b7b7..9c107231ba 100644 --- a/tests/trainer/test_fsdp_checkpoint.py +++ b/tests/trainer/test_fsdp_checkpoint.py @@ -515,6 +515,8 @@ def test_fsdp_mixed_with_sync( '0.24.0', '0.25.0', '0.26.0', + '0.27.0', + '0.28.0', ], ) @pytest.mark.filterwarnings(r'ignore:.*metrics are not saved with sharded state dict.*:UserWarning') @@ -534,9 +536,12 @@ def test_fsdp_load_old_checkpoint( if composer_version == '0.18.1' and state_dict_type == 'full' and precision == 'amp_bf16' and sharding_strategy == 'FULL_SHARD': pytest.skip('TODO: This checkpoint is missing') - if (composer_version in ['0.22.0', '0.23.0'] and version.parse(torch.__version__) < version.parse('2.3.0')) or ( - composer_version in ['0.24.0', '0.25.0'] and version.parse(torch.__version__) < version.parse('2.4.0') - ) or (composer_version in '0.26.0' and version.parse(torch.__version__) < version.parse('2.5.0')): + if (composer_version in ['0.22.0', '0.23.0'] and version.parse(torch.__version__) < version.parse('2.3.0') + ) or (composer_version in ['0.24.0', '0.25.0'] and + version.parse(torch.__version__) < version.parse('2.4.0')) or ( + composer_version in ['0.26.0', '0.27.0', '0.28.0'] and + version.parse(torch.__version__) < version.parse('2.5.0') + ): pytest.skip('Current torch version is older than torch version that checkpoint was written with.') if composer_version in ['0.13.5', '0.14.0', '0.14.1', '0.15.1']: @@ -1311,7 +1316,6 @@ def test_fsdp_monolith_resumption( save_interval=save_interval, num_epochs=1, # set in get_trainer() num_batches_per_epoch=8, # set in get_trainer() - is_deepspeed=False, ) resume_file = os.path.join(save_folder, 'first', resume_file) diff --git a/tests/trainer/test_trainer.py b/tests/trainer/test_trainer.py index 0c62c5c4cc..5b57a3374d 100644 --- a/tests/trainer/test_trainer.py +++ b/tests/trainer/test_trainer.py @@ -26,7 +26,7 @@ from composer.models import ComposerModel from composer.optim import DecoupledSGDW, ExponentialScheduler from composer.trainer.trainer import _generate_run_name -from composer.utils import dist, is_model_deepspeed, is_model_fsdp, map_collection, reproducibility +from composer.utils import dist, is_model_fsdp, map_collection, reproducibility from tests.common import ( EmptyModel, InfiniteClassificationDataset, @@ -671,29 +671,6 @@ def dummy_fwd(self, *args, **kwargs): with pytest.raises(RuntimeError, match='Encountered non-addressable cuda error while using auto.*'): trainer.fit() - @pytest.mark.gpu - @pytest.mark.parametrize('precision', [Precision.FP32, Precision.AMP_BF16, Precision.AMP_FP16]) - @pytest.mark.filterwarnings('ignore::UserWarning') - def test_deepspeed( - self, - model: ComposerModel, - precision: Precision, - max_duration: Time[int], - train_dataloader: DataLoader, - ): - trainer = Trainer( - model=model, - precision=precision, - deepspeed_config={}, - max_duration=max_duration, - train_dataloader=train_dataloader, - ) - - assert is_model_deepspeed(trainer.state.model) - - assert trainer.state.deepspeed_enabled - trainer.fit() - @pytest.mark.gpu @pytest.mark.parametrize('precision', [Precision.FP32, Precision.AMP_BF16, Precision.AMP_FP16]) @pytest.mark.filterwarnings('ignore::UserWarning') diff --git a/tests/utils/test_autolog_hparams.py b/tests/utils/test_autolog_hparams.py index e93ceb39ff..23aa71f00c 100644 --- a/tests/utils/test_autolog_hparams.py +++ b/tests/utils/test_autolog_hparams.py @@ -174,8 +174,7 @@ def test_extract_hparams_trainer(): # Graceful Resumption 'autoresume': False, - # DeepSpeed - 'deepspeed_config': None, + # Parallelism 'parallelism_config': None, # System/Numerics