Skip to content

Commit

Permalink
Optimize logging (#1276)
Browse files Browse the repository at this point in the history
* optimize logging

* fix ut

* lint
  • Loading branch information
BalaBalaYi authored Sep 23, 2024
1 parent fc2bd98 commit 97f39dc
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 13 deletions.
10 changes: 7 additions & 3 deletions dlrover/python/master/elastic_training/rdzv_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,11 +200,15 @@ def _get_lacking_ranks(self) -> List[int]:
"""

lacking_ranks: List[int] = []
if self._rdzv_params is None or self._rdzv_params.min_nodes <= 0:
if (
self._rdzv_params is None
or self._rdzv_params.min_nodes <= 0
or self._rdzv_params.max_nodes <= 0
):
return lacking_ranks

min_required = self._rdzv_params.min_nodes
min_ranks = set([i for i in range(min_required)])
max_required = self._rdzv_params.max_nodes
min_ranks = set([i for i in range(max_required)])
if self._waiting_nodes:
waiting_ranks = set(self._waiting_nodes.keys())
else:
Expand Down
10 changes: 8 additions & 2 deletions dlrover/python/master/node/dist_job_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,9 @@ def __init__(
raise ValueError(
f"Distribution strategy {job_args.distribution_strategy} "
"is not supported. You can specify it with "
"ParameterServerStrategy/AllreduceStrategy."
"ParameterServerStrategy/AllReduceStrategy."
)
logger.info("New job optimizer : %s", self._job_optimizer.__class__)
logger.info(f"New job optimizer: {self._job_optimizer.__class__}")

worker_restart_count = node_restart_count.get(NodeType.WORKER, 0)
ps_restart_count = node_restart_count.get(NodeType.PS, 0)
Expand All @@ -150,6 +150,12 @@ def __init__(
self._ps_relaunch_max_num = min(
ps_restart_count, _MAX_POD_RELAUNCH_COUNT
)
logger.info(
f"Worker relaunch number: {self._relaunch_on_worker_failure}; "
f"PS relaunch number: {self._ps_relaunch_max_num}; "
f"Critical worker index: {self._critical_worker_index}."
)

self._node_event_callbacks: List[NodeEventCallback] = []

# Protects followed variables, which are accessed from event_cb.
Expand Down
9 changes: 6 additions & 3 deletions dlrover/python/master/node/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,12 +368,12 @@ def is_training_hang_by_pending(self, total_node_num) -> bool:
pending_nodes
):
logger.debug(
"Skip for no required nodes info " "and not all nodes pending."
"Skip for no required nodes info and not all nodes pending."
)
return False
elif 0 < len(pending_nodes) == total_node_num:
# all nodes pending
logger.debug(f"All nodes pending: {pending_nodes}.")
logger.info(f"All nodes pending: {pending_nodes}.")
else:
# partial nodes pending
# with condition 1 + 2
Expand Down Expand Up @@ -404,7 +404,10 @@ def is_training_hang_by_pending(self, total_node_num) -> bool:
if now - first_pending_node.create_time.timestamp() > timeout:
logger.warning(
f"Node {first_pending_node.name} "
f"exceeded pending timeout: {timeout}s."
f"exceeded pending timeout: {timeout}s, "
f"running nodes(size:{len(running_nodes)}): {running_nodes}, "
f"pending nodes(size:{len(pending_nodes)}): {pending_nodes}, "
f"min required nodes size: {self.get_min_nodes_required()}."
)
return True

Expand Down
11 changes: 6 additions & 5 deletions dlrover/python/tests/test_rdzv_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,23 +172,24 @@ def test_get_lacking_ranks(self):
rdzv_manager = ElasticTrainingRendezvousManager(error_monitor)

rdzv_manager._rdzv_params.min_nodes = 4
rdzv_manager._rdzv_params.max_nodes = 4
rdzv_manager._waiting_nodes = {0: 0, 1: 1, 2: 2, 3: 3}
self.assertEqual(rdzv_manager._get_lacking_ranks(), [])

rdzv_manager._rdzv_params.min_nodes = 5
rdzv_manager._rdzv_params.max_nodes = 5
self.assertEqual(rdzv_manager._get_lacking_ranks(), [4])

rdzv_manager._rdzv_params.min_nodes = 3
rdzv_manager._rdzv_params.max_nodes = 3
self.assertEqual(rdzv_manager._get_lacking_ranks(), [])

rdzv_manager._rdzv_params.min_nodes = 6
rdzv_manager._rdzv_params.max_nodes = 6
self.assertEqual(rdzv_manager._get_lacking_ranks(), [4, 5])

rdzv_manager._rdzv_params.min_nodes = 4
rdzv_manager._rdzv_params.max_nodes = 4
rdzv_manager._waiting_nodes = {}
self.assertEqual(rdzv_manager._get_lacking_ranks(), [0, 1, 2, 3])

rdzv_manager._rdzv_params.min_nodes = 0
rdzv_manager._rdzv_params.max_nodes = 0
self.assertEqual(rdzv_manager._get_lacking_ranks(), [])

def test_multi_updating_waiting_nodes(self):
Expand Down

0 comments on commit 97f39dc

Please sign in to comment.