Skip to content

Commit

Permalink
Added first draft of raydeepspeed strategy
Browse files Browse the repository at this point in the history
  • Loading branch information
annalappe committed Nov 6, 2024
1 parent 29fa7a3 commit e78cc0b
Show file tree
Hide file tree
Showing 6 changed files with 167 additions and 61 deletions.
77 changes: 67 additions & 10 deletions src/itwinai/torch/distributed.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from torch.optim.optimizer import Optimizer
from torch.utils.data import DataLoader, Dataset, DistributedSampler, Sampler
from torch.utils.data.dataloader import T_co, _collate_fn_t, _worker_init_fn_t

from deepspeed.accelerator import get_accelerator
from ..distributed import DistributedStrategy
from .type import DistributedStrategyError, UninitializedStrategyError

Expand Down Expand Up @@ -290,7 +290,8 @@ def create_dataloader(
if self.is_distributed:
if sampler is None:
sampler = DistributedSampler(
dataset, num_replicas=self.global_world_size(),
dataset,
num_replicas=self.global_world_size(),
rank=self.global_rank(),
shuffle=shuffle
)
Expand Down Expand Up @@ -596,7 +597,6 @@ def init(self) -> None:
os.environ['OMPI_COMM_WORLD_LOCAL_RANK'] = os.environ.get(
'LOCAL_RANK', ompi_lrank
)

# https://deepspeed.readthedocs.io/en/latest/initialize.html#training-initialization
self.deepspeed.init_distributed(dist_backend=self.backend)
self.is_initialized = True
Expand Down Expand Up @@ -1127,19 +1127,64 @@ def __init__(
self,
backend: Literal['nccl', 'gloo', 'mpi']
) -> None:
# Set local rank

# os.environ['RANK'] = os.environ.get('SLURM_PROCID')
# os.environ['WORLD_SIZE'] = os.environ.get('SLURM_NTASKS')
# os.environ['LOCAL_RANK'] = os.environ.get('SLURM_LOCALID')

# print("Environment variables: ")
# print(os.environ.get('MASTER_ADDR'))
# print(os.environ.get('MASTER_PORT'))
# print(os.environ['RANK'])
# print(os.environ['WORLD_SIZE'])
# print(os.environ['LOCAL_RANK'])

# print(os.environ)

# deepspeed.init_distributed()

# if not distributed_resources_available():
# raise RuntimeError(
# "Trying to run distributed on insufficient resources.")

# https://github.com/Lightning-AI/pytorch-lightning/issues/13567
# ompi_lrank = os.environ.get('OMPI_COMM_WORLD_LOCAL_RANK')
# os.environ['OMPI_COMM_WORLD_LOCAL_RANK'] = os.environ.get(
# 'LOCAL_RANK', ompi_lrank)
# 'LOCAL_RANK', ompi_lrank
# )
# https://deepspeed.readthedocs.io/en/latest/initialize.html#training-initialization
# print(backend)

deepspeed.init_distributed(dist_backend=backend)
# print("Trying to initialize strategy...")
# deepspeed.init_distributed(dist_backend=backend)
self.is_initialized = True
# print("Successfully initialized strategy!")

self.set_device()
# self.set_device()

super().__init__()

def init(self) -> None:
pass

# if not distributed_resources_available():
# raise RuntimeError(
# "Trying to run distributed on insufficient resources.")

# https://github.com/Lightning-AI/pytorch-lightning/issues/13567
# This block of code should be removed as some point
if os.environ.get('LOCAL_RANK'):
os.environ['OMPI_COMM_WORLD_LOCAL_RANK'] = os.environ.get('LOCAL_RANK')

# https://deepspeed.readthedocs.io/en/latest/initialize.html#training-initialization
# print(backend)
deepspeed.init_distributed()
# print("Trying to initialize strategy...")
# deepspeed.init_distributed(dist_backend=backend)
self.is_initialized = True
# print("Successfully initialized strategy!")

print(os.environ)
self.set_device()

def global_world_size(self) -> int:
return dist.get_world_size()
Expand All @@ -1151,7 +1196,7 @@ def global_rank(self) -> int:
return dist.get_rank()

def local_rank(self) -> int:
dist.get_rank() % torch.cuda.device_count()
return dist.get_rank() % torch.cuda.device_count()

def distributed(
self,
Expand All @@ -1162,11 +1207,16 @@ def distributed(
**init_kwargs
) -> Tuple[Module | Optimizer | LRScheduler | None]:

master_port = os.environ.get('MASTER_PORT')

distrib_model, optimizer, _, lr_scheduler = deepspeed.initialize(
model=model,
model_parameters=model_parameters,
optimizer=optimizer,
lr_scheduler=lr_scheduler,
distributed_port=master_port,

dist_init_required=True,
**init_kwargs
)
return distrib_model, optimizer, lr_scheduler
Expand All @@ -1177,11 +1227,18 @@ def create_dataloader(
batch_size: Optional[int] = 1,
shuffle: Optional[bool] = None,
sampler: Union[Sampler, Iterable, None] = None,
batch_sampler: Union[Sampler[List], Iterable[List], None] = None,
collate_fn: Optional[Callable[[List], Any]] = None
):
if batch_sampler is not None:
print(
"WARNING: batch_sampler is ignored by TorchDistributedStrategy"
)

if sampler is None:
sampler = DistributedSampler(
dataset, num_replicas=self.global_world_size(),
dataset,
num_replicas=self.global_world_size(),
rank=self.global_rank(),
shuffle=shuffle
)
Expand Down
42 changes: 24 additions & 18 deletions src/itwinai/torch/trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -1304,14 +1304,12 @@ def __init__(
logger: Optional[Logger] = None
) -> None:
super().__init__(name=name)
print("Getting at least to the init function!")
self.logger = logger
print(strategy)
self.strategy = self._initialize_strategy(strategy)

self._set_configs(config=config)
self.logger = logger

self._initialize_ray()
self.strategy = self._initialize_strategy(strategy)
self._set_configs(config=config)

@property
def device(self) -> str:
Expand All @@ -1323,7 +1321,7 @@ def _set_configs(self, config: Dict):
self.config = deep_update(DEFAULT_CONFIG, config)

self._set_scaling_config()
self._set_tune_config()
# self._set_tune_config()
self._set_run_config()
self._set_train_loop_config()

Expand All @@ -1336,10 +1334,15 @@ def _initialize_ray(self):
raise EnvironmentError(
"Ray initialization requires 'ip_head' and 'head_node_ip' to be set.")

# if not ray.is_initialized():
# ray.init(
# address=ip_head,
# _node_ip_address=head_node_ip
# )

if not ray.is_initialized():
ray.init(
address=ip_head,
_node_ip_address=head_node_ip
address="auto"
)

except Exception as e:
Expand Down Expand Up @@ -1412,19 +1415,22 @@ def execute(
)
trainer = ray.train.torch.TorchTrainer(
train_with_data,
train_loop_config=self.train_loop_config,
scaling_config=self.scaling_config,
run_config=self.run_config
)
param_space = {
"train_loop_config": self.train_loop_config
}
tuner = tune.Tuner(
trainer,
param_space=param_space,
tune_config=self.tune_config
)

result_grid = tuner.fit()
# param_space = {
# "train_loop_config": self.train_loop_config
# }
# tuner = tune.Tuner(
# trainer,
# param_space=param_space,
# tune_config=self.tune_config
# )

# result_grid = tuner.fit()

result_grid = trainer.fit()

return train_dataset, validation_dataset, test_dataset, result_grid

Expand Down
6 changes: 5 additions & 1 deletion use-cases/mnist/torch-lightning/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,11 @@ training_pipeline:
limit_train_batches: null
limit_val_batches: null
log_every_n_steps: null
logger: null
logger:
class_path: itwinai.loggers.PyTorchLightningAdapter
init_args:
itwinai_logger:
class_path: itwinai.loggers.MlFlowLogger
max_epochs: 5
max_steps: -1
max_time: null
Expand Down
54 changes: 27 additions & 27 deletions use-cases/virgo/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,13 @@ ray_training_pipeline:
init_args:
config:
scaling_config:
num_workers: 2
num_workers: 4
use_gpu: true
resources_per_worker:
CPU: 5
GPU: 1
tune_config:
num_samples: 2
# tune_config:
# num_samples: 2
# scheduler:
# name: asha
# max_t: 10
Expand All @@ -117,13 +117,13 @@ ray_training_pipeline:
storage_path: ray_checkpoints
name: Virgo-HPO-Experiment
train_loop_config:
batch_size:
type: choice
options: [32, 64, 128]
learning_rate:
type: uniform
min: 1e-5
max: 1e-3
batch_size: 64
# type: choice
# options: [32, 64, 128]
learning_rate: 0.0005
# type: uniform
# min: 1e-5
# max: 1e-3
epochs: 2
generator: simple #unet
loss: L1
Expand Down Expand Up @@ -163,31 +163,31 @@ ray_training_pipeline_small:
init_args:
config:
scaling_config:
num_workers: 2
num_workers: 4
use_gpu: true
resources_per_worker:
CPU: 5
GPU: 1
tune_config:
num_samples: 4
scheduler:
name: asha
max_t: 10
grace_period: 5
reduction_factor: 4
brackets: 1
# tune_config:
# num_samples: 2
# scheduler:
# name: asha
# max_t: 10
# grace_period: 5
# reduction_factor: 4
# brackets: 1
run_config:
storage_path: ray_checkpoints
name: Virgo-HPO-Experiment
train_loop_config:
batch_size:
type: choice
options: [32, 64, 128]
learning_rate:
type: uniform
min: 1e-5
max: 1e-3
epochs: 10
batch_size: 64
# type: choice
# options: [32, 64, 128]
learning_rate: 0.0005
# type: uniform
# min: 1e-5
# max: 1e-3
epochs: 2
generator: simple #unet
loss: L1
save_best: false
Expand Down
5 changes: 4 additions & 1 deletion use-cases/virgo/slurm_ray.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ num_cpus=$SLURM_CPUS_PER_TASK
# This tells Tune to not change the working directory to the trial directory
# which makes relative paths accessible from inside a trial
export RAY_CHDIR_TO_TRIAL_DIR=0

export RAY_DEDUP_LOGS=0
export RAY_USAGE_STATS_DISABLE=1

######### Set up Ray cluster ########
Expand All @@ -51,6 +51,9 @@ port=7639 # This port will be used by Ray to communicate with worker nodes
export ip_head="$head_node"i:"$port"
export head_node_ip="$head_node"i

export MASTER_ADDR=$head_node_ip
export MASTER_PORT=$port

echo "Starting HEAD at $head_node"
# Start Ray on the head node.
# The `--head` option specifies that this node will be the head of the Ray cluster.
Expand Down
Loading

0 comments on commit e78cc0b

Please sign in to comment.