Skip to content
This repository has been archived by the owner on Oct 23, 2024. It is now read-only.

Commit

Permalink
Merge pull request #22 from fidelity/ddp_mpi
Browse files Browse the repository at this point in the history
DDP MPI Support + DataLoader Fix
  • Loading branch information
ncilfone authored Oct 26, 2021
2 parents d5581ac + a5eb779 commit 619042a
Show file tree
Hide file tree
Showing 7 changed files with 65 additions and 13 deletions.
11 changes: 9 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,10 @@ and are only conditionally imported).

Follow the instructions [here](https://github.com/NVIDIA/apex#quick-start).

### (Optional) OpenMPI Support
### (Optional) Underlying OpenMPI Support

**Note: MPI support is necessary if you plan to run Stoke across multiple compute nodes (e.g. 2 nodes with 4 GPUs each)
with DDP, Horovod, or DeepSpeed backends**

Follow the instructions [here](https://www.open-mpi.org/faq/?category=building) or
[here](https://edu.itp.phys.ethz.ch/hs12/programming_techniques/openmpi.pdf)
Expand All @@ -64,6 +67,10 @@ pip install stoke
```

### via PyPi w/ Optional MPI Support

**Note: MPI support is necessary if you plan to run Stoke across multiple compute nodes (e.g. 2 nodes with 4 GPUs each)
with DDP, Horovod, or DeepSpeed backends**

```bash
pip install stoke[mpi]
```
Expand Down Expand Up @@ -211,10 +218,10 @@ sampler = DistributedSampler(
)

# Call the DataLoader method on the stoke_obj to correctly create a DataLoader instance
# The DataLoader object already known the batch size from the Stoke object creation
data_loader = stoke_obj.DataLoader(
dataset=dataset,
collate_fn=lambda batch: dataset.collate_fn(batch),
batch_size=32,
sampler=sampler,
num_workers=4
)
Expand Down
28 changes: 25 additions & 3 deletions docs/Launchers.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,31 @@ mpirun -np 16 \
```
### Deepspeed w/ OpenMPI
Prefer the OpenMPI version [
here](https://www.deepspeed.ai/getting-started/#multi-node-environment-variables) over the native
launcher. Deepspeed will automatically discover devices, etc. via mpi4py.
Prefer the OpenMPI version [here](https://www.deepspeed.ai/getting-started/#multi-node-environment-variables) over the
native launcher. Deepspeed will automatically discover devices, etc. via mpi4py. Can also be used
with k8s via the [MPI Operator](https://github.com/kubeflow/mpi-operator)
```shell
mpirun -np 4 \
--allow-run-as-root -bind-to none -map-by slot \
-x NCCL_DEBUG=INFO -x LD_LIBRARY_PATH -x PATH \
-mca pml ob1 -mca btl ^openib \
python train.py
```
or
```shell
mpirun -np 16 \
-H server1:4,server2:4,server3:4,server4:4 \
-bind-to none -map-by slot \
-x NCCL_DEBUG=INFO -x LD_LIBRARY_PATH -x PATH \
-mca pml ob1 -mca btl ^openib \
python train.py
```
### PyTorch DDP w/ OpenMPI
Leverage Deepspeed functionality to automatically discover devices, etc. via mpi4py. Can also be used
with k8s via the [MPI Operator](https://github.com/kubeflow/mpi-operator)
```shell
mpirun -np 4 \
Expand Down
2 changes: 1 addition & 1 deletion docs/Quick-Start.md
Original file line number Diff line number Diff line change
Expand Up @@ -140,10 +140,10 @@ sampler = DistributedSampler(
)

# Call the DataLoader method on the stoke_obj to correctly create a DataLoader instance
# The DataLoader object already known the batch size from the Stoke object creation
data_loader = stoke_obj.DataLoader(
dataset=dataset,
collate_fn=lambda batch: dataset.collate_fn(batch),
batch_size=32,
sampler=sampler,
num_workers=4
)
Expand Down
2 changes: 0 additions & 2 deletions examples/cifar10/train.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,6 @@ def main():
# Construct the DataLoader
train_loader = cifar_stoke.DataLoader(
dataset=training_dataset,
batch_size=configs.DataConfig.batch_size,
sampler=train_sampler,
num_workers=configs.DataConfig.n_workers
if configs.DataConfig.n_workers is not None
Expand All @@ -165,7 +164,6 @@ def main():
)
test_loader = cifar_stoke.DataLoader(
dataset=test_dataset,
batch_size=configs.DataConfig.batch_size,
sampler=test_sampler,
num_workers=configs.DataConfig.n_workers
if configs.DataConfig.n_workers is not None
Expand Down
4 changes: 4 additions & 0 deletions stoke/configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@ class DDPConfig:
----------
local_rank: Optional[int]
Current local rank of the device (provided here, as LOCAL_RANK env var, or parsed from --local_arg)
auto_mpi_discovery: bool, default: False
if distributed environment variables are not set, attempt to discover them from MPI (using underlying deepspeed
function call)
convert_to_sync_batch_norm: bool, default: False
Automatically convert all batch norm calls to torch.nn.SyncBatchNorm calls
https://pytorch.org/docs/stable/generated/torch.nn.SyncBatchNorm.html
Expand Down Expand Up @@ -168,6 +171,7 @@ class DDPConfig:
"""

local_rank: Optional[int]
auto_mpi_discovery: bool = False
convert_to_sync_batch_norm: bool = False
backend: BackendOptions = "nccl"
broadcast_buffers: bool = True
Expand Down
24 changes: 24 additions & 0 deletions stoke/distributed.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import deepspeed as ds
import horovod.torch as hvd
import torch
from deepspeed.utils.distributed import mpi_discovery
from fairscale.optim.oss import OSS

from stoke.configs import ClipGradConfig, ClipGradNormConfig
Expand Down Expand Up @@ -490,11 +491,34 @@ def _create_ddp_handler(kwargs: dict):
def _call_init(self):
"""Does any backend initialization work related to DDP setup
Borrows code from DeepSpeed to setup DDP via openMPI
https://github.com/microsoft/DeepSpeed/blob/master/deepspeed/utils/distributed.py
Returns
-------
None
"""
# Borrowing a bit of code from deepspeed
required_env = [
"RANK",
"WORLD_SIZE",
"MASTER_ADDR",
"MASTER_PORT",
"LOCAL_RANK",
]
if self._ddp_config.auto_mpi_discovery and not all(
map(lambda v: v in os.environ, required_env)
):
try:
from mpi4py import MPI

mpi_discovery(verbose=True)
except ImportError as e:
print(
e,
": mpi4py cannot be imported -- please install Stoke with the MPI option (pip install stoke[mpi])",
)
# Initialize call for DDP
torch.distributed.init_process_group(
backend=self._ddp_config.backend, init_method=self._ddp_config.init_method
Expand Down
7 changes: 2 additions & 5 deletions stoke/stoke.py
Original file line number Diff line number Diff line change
Expand Up @@ -736,7 +736,6 @@ def _get_fp16_mixin(self):
def DataLoader(
self,
dataset: Dataset[T_co],
batch_size: Optional[int] = 1,
shuffle: bool = False,
sampler: Optional[Sampler[int]] = None,
batch_sampler: Optional[Sampler[Sequence[int]]] = None,
Expand Down Expand Up @@ -764,8 +763,6 @@ def DataLoader(
----------
dataset: Dataset
dataset from which to load the data.
batch_size: int, default: 1
how many samples per batch to load .
shuffle: bool, default: False
set to ``True`` to have the data reshuffled at every epoch.
sampler: Sampler or Iterable, default: None
Expand Down Expand Up @@ -817,12 +814,12 @@ def DataLoader(

if self._verbose and self.gpu:
print(f"Automatically handling moving model input data to GPU(s)...")

# Forward the already known options from the Stoke status
return StokeDataLoader(
gpu=self.gpu,
fp16=self.fp16,
batch_size=self.batch_size,
dataset=dataset,
batch_size=batch_size,
shuffle=shuffle,
sampler=sampler,
batch_sampler=batch_sampler,
Expand Down

0 comments on commit 619042a

Please sign in to comment.