Skip to content

Commit

Permalink
support for vanilla single node deepspeed executions (#10)
Browse files Browse the repository at this point in the history
- New public interface named `DeepspeedExecutor.for_single_node`
- This interface allows calling DeepspeedExecutor with vanilla @kubernetes decorator. 
- a minimal example to showcase how it can be used
  • Loading branch information
emattia authored May 1, 2024
1 parent 346644d commit 9c4e75d
Show file tree
Hide file tree
Showing 7 changed files with 85 additions and 19 deletions.
5 changes: 5 additions & 0 deletions examples/single-node/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Run a Deepspeed hello world! Checkout `train.py` to see the script passed to the Deepspeed launcher.

```
python flow.py run
```
31 changes: 31 additions & 0 deletions examples/single-node/flow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from metaflow import FlowSpec, step, kubernetes

IMAGE = "docker.io/eddieob/deepspeed:6"
MEMORY = "16000"
N_CPU = 2


class SingleNodeDeepspeedTest(FlowSpec):
@step
def start(self):
self.next(self.train)

@kubernetes(image=IMAGE, memory=MEMORY, cpu=N_CPU)
@step
def train(self):
from metaflow.plugins.deepspeed_libs.executor import DeepspeedExecutor
executor = DeepspeedExecutor.for_single_node(
flow=self,
use_gpu=False,
n_slots = N_CPU
)
executor.run(entrypoint="train.py")
self.next(self.end)

@step
def end(self):
pass


if __name__ == "__main__":
SingleNodeDeepspeedTest()
20 changes: 20 additions & 0 deletions examples/single-node/train.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from argparse import ArgumentParser
import time
from deepspeed import comm as dist
from deepspeed import init_distributed

if __name__ == "__main__":
parser = ArgumentParser()
parser.add_argument(
"--local_rank",
type=int,
default=-1,
help="local rank passed from distributed launcher",
)
args = parser.parse_args()
init_distributed(dist_backend="gloo")
global_rank = dist.get_rank()
time.sleep(3)
dist.barrier()
print(f"I am global rank {global_rank} | local rank {args.local_rank}")
time.sleep(3)
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,11 @@ def _setup_current(self, hosts, n_slots_per_host, is_gpu, flow):
current._update_env(
{
"deepspeed": DeepspeedExecutor(
hosts,
n_slots_per_host,
is_gpu,
flow,
self.attributes["worker_polling_freq"],
self.flow_datastore,
hosts=hosts,
n_slots_per_host=n_slots_per_host,
is_gpu=is_gpu,
worker_polling_freq=self.attributes["worker_polling_freq"],
flow_datastore=self.flow_datastore,
)
}
)
Expand Down Expand Up @@ -94,10 +93,10 @@ def setup_distributed_env(
):
"Return a list of strings of hostnames of nodes to use for MPI"
hosts = setup_mpi_env(
self._ubf_context,
self.attributes["all_nodes_started_timeout"],
self.n_slots,
self.flow_datastore,
ubf_context=self._ubf_context,
all_nodes_started_timeout=self.attributes["all_nodes_started_timeout"],
n_slots=self.n_slots,
flow_datastore=self.flow_datastore,
)
self._setup_current(
hosts=hosts, n_slots_per_host=self.n_slots, is_gpu=self.is_gpu, flow=flow
Expand Down
19 changes: 15 additions & 4 deletions metaflow_extensions/deepspeed/plugins/deepspeed_libs/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,12 @@ def __init__(
hosts: List[str],
n_slots_per_host: int = 1,
is_gpu: bool = False,
flow=None,
worker_polling_freq: int = 10,
flow_datastore=None,
flow_datastore = None,
) -> None:
self.is_gpu = is_gpu
self.n_slots_per_host = n_slots_per_host
self.hosts = hosts
self.flow = flow
self.worker_polling_freq = worker_polling_freq
self._flow_datastore = flow_datastore
self._heartbeat_thread = None # This is only set on the control task
Expand Down Expand Up @@ -260,7 +258,7 @@ def run(
"""upon completion returns the final path of the `cloud_output_dir` if `push_results_dir_to_cloud` is set to true"""
from metaflow import current

node_index = current.parallel.node_index # assumes parallel
node_index = current.parallel.node_index or 0
datastore = DeepspeedDatastore(
flow_datastore=self._flow_datastore, pathspec=current.pathspec
)
Expand Down Expand Up @@ -294,3 +292,16 @@ def run(
file=sys.stderr,
)
return datastore.get_datastore_file_location(cloud_output_dir)

@classmethod
def for_single_node(cls, flow, use_gpu=False, n_slots=1):
from .mpi_setup import setup_mpi_env
flow_datastore = flow._datastore.parent_datastore
hosts = setup_mpi_env(flow_datastore=flow_datastore, n_slots=n_slots, )
executor = cls(
hosts=hosts,
n_slots_per_host=n_slots,
is_gpu=use_gpu,
flow_datastore=flow_datastore
)
return executor
Original file line number Diff line number Diff line change
Expand Up @@ -238,11 +238,11 @@ def setup_known_hosts(hosts, ubf_context, node_index):


def setup_mpi_env(
ubf_context,
all_nodes_started_timeout,
n_slots,
flow_datastore,
polling_frequency=0.1,
ubf_context: str = UBF_CONTROL,
all_nodes_started_timeout: int = 600,
n_slots: int = 1,
polling_frequency: float = 0.1,
):
"""
1. create ssh keys on each host
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from setuptools import setup, find_namespace_packages

version = "0.0.6"
version = "0.0.7"

setup(
name="metaflow-deepspeed",
Expand Down

0 comments on commit 9c4e75d

Please sign in to comment.