diff --git a/dlrover/python/common/constants.py b/dlrover/python/common/constants.py index f6cd2cf84..3841aba74 100644 --- a/dlrover/python/common/constants.py +++ b/dlrover/python/common/constants.py @@ -212,6 +212,7 @@ class NodeEnv(object): RELAUNCHED_POD = "RELAUNCHED_POD" DLROVER_MASTER_ADDR = "DLROVER_MASTER_ADDR" GRPC_ENABLE_FORK = "GRPC_ENABLE_FORK_SUPPORT" + GRPC_POLL_STRATEGY = "GRPC_POLL_STRATEGY" POD_NAME = "POD_NAME" MONITOR_ENABLED = "MONITOR_ENABLED" JOB_NAME = "ELASTIC_JOB_NAME" diff --git a/dlrover/python/elastic_agent/master_client.py b/dlrover/python/elastic_agent/master_client.py index e646da113..2ad1bb5e3 100644 --- a/dlrover/python/elastic_agent/master_client.py +++ b/dlrover/python/elastic_agent/master_client.py @@ -29,7 +29,7 @@ def retry_grpc_request(func): def wrapper(self, *args, **kwargs): retry = kwargs.get("retry", 10) - execption = None + exception = None for i in range(retry): try: return func(self, *args, **kwargs) @@ -39,11 +39,11 @@ def wrapper(self, *args, **kwargs): logger.warning( f"Retry {i} to {class_name}.{func_name} with failure", ) - execption = e + exception = e time.sleep(5) - if execption: - logger.error(execption) - raise execption + if exception: + logger.error(exception) + raise exception return wrapper diff --git a/dlrover/python/master/scaler/pod_scaler.py b/dlrover/python/master/scaler/pod_scaler.py index 3c41da9f6..eb7897ab1 100644 --- a/dlrover/python/master/scaler/pod_scaler.py +++ b/dlrover/python/master/scaler/pod_scaler.py @@ -472,9 +472,12 @@ def _create_pod(self, node: Node): env.append(V1EnvVar(name=NodeEnv.JOB_NAME, value=self._job_name)) env.append(V1EnvVar(name=NodeEnv.JOB_UID, value=self._job_uid)) - # A deadlock can happen when pthread_atfork handler is running. - # For detail https://chromium.googlesource.com/external/github.com/grpc/grpc/+/refs/tags/v1.19.0-pre1/doc/fork_support.md # noqa: E501 - env.append(V1EnvVar(name=NodeEnv.GRPC_ENABLE_FORK, value="False")) + # At the cost of increased performance overhead, these provide greater + # stability in concurrent scenarios. (need grpcio version>=1.58) + # A history background: https://chromium.googlesource.com/external/ + # github.com/grpc/grpc/+/refs/tags/v1.19.0-pre1/doc/fork_support.md + env.append(V1EnvVar(name=NodeEnv.GRPC_ENABLE_FORK, value="true")) + env.append(V1EnvVar(name=NodeEnv.GRPC_POLL_STRATEGY, value="poll")) worker_num = self._config_worker_num if worker_num == 0: diff --git a/dlrover/python/tests/test_master_client.py b/dlrover/python/tests/test_master_client.py index e24f296ff..8b6a38d12 100644 --- a/dlrover/python/tests/test_master_client.py +++ b/dlrover/python/tests/test_master_client.py @@ -35,6 +35,7 @@ def tearDown(self): self._master.stop() def test_open_channel(self): + self.assertEqual(self._master_client._timeout, 0.5) self.assertEqual(self._master_client._timeout, 0.5) self._master_client.close_channel() self._master_client.open_channel()