diff --git a/composer/loggers/mlflow_logger.py b/composer/loggers/mlflow_logger.py index 9d46aaeeab..2905f033fe 100644 --- a/composer/loggers/mlflow_logger.py +++ b/composer/loggers/mlflow_logger.py @@ -35,7 +35,6 @@ DEFAULT_MLFLOW_EXPERIMENT_NAME = 'my-mlflow-experiment' LOG_DUPLICATED_METRIC_VALUE_PER_N_STEPS = 100 -LOG_DUPLICATED_METRIC_VALUE_PER_N_MILLIS = 600000 class MlflowMonitorProcess(multiprocessing.Process): @@ -123,8 +122,6 @@ class MLFlowLogger(LoggerDestination): log_duplicated_metric_every_n_steps (int, optional): The number of steps to wait before logging the duplicated metric value. Duplicated metric value means the new step has the same value as the previous step. (default: ``100``) - log_duplicated_metric_every_n_millis (int, optional): The number of milliseconds to wait - before logging the duplicated metric value. (default: ``600000``) """ def __init__( @@ -146,7 +143,6 @@ def __init__( resume: bool = False, logging_buffer_seconds: Optional[int] = 10, log_duplicated_metric_every_n_steps: int = 100, - log_duplicated_metric_every_n_millis: int = 600000, ) -> None: try: import mlflow @@ -189,7 +185,6 @@ def __init__( mlflow.set_system_metrics_sampling_interval(5) self.log_duplicated_metric_every_n_steps = log_duplicated_metric_every_n_steps - self.log_duplicated_metric_every_n_millis = log_duplicated_metric_every_n_millis self._metrics_cache = {} self._rank_zero_only = rank_zero_only @@ -403,25 +398,24 @@ def log_metrics(self, metrics: dict[str, Any], step: Optional[int] = None) -> No metrics_to_log = {} step = step or 0 - current_time_millis = int(time.time() * 1000) for k, v in metrics.items(): if any(fnmatch.fnmatch(k, pattern) for pattern in self.ignore_metrics): continue if k in self._metrics_cache: - value, last_step, last_time = self._metrics_cache[k] - if value == v and step < last_step + self.log_duplicated_metric_every_n_steps and current_time_millis < last_time + self.log_duplicated_metric_every_n_millis: + value, last_step = self._metrics_cache[k] + if value == v and step < last_step + self.log_duplicated_metric_every_n_steps: # Skip logging the metric if it has the same value as the last step and it's - # within the step and time window. + # within the step window. continue else: - # Log the metric if it has a different value or it's outside the step and time - # window, and update the metrics cache. - self._metrics_cache[k] = (v, step, current_time_millis) + # Log the metric if it has a different value or it's outside the step window, + # and update the metrics cache. + self._metrics_cache[k] = (v, step) metrics_to_log[self.rename(k)] = float(v) else: # Log the metric if it's the first time it's being logged, and update the metrics # cache. - self._metrics_cache[k] = (v, step, current_time_millis) + self._metrics_cache[k] = (v, step) metrics_to_log[self.rename(k)] = float(v) log_metrics( diff --git a/tests/loggers/test_mlflow_logger.py b/tests/loggers/test_mlflow_logger.py index 962da64562..9d84baa06f 100644 --- a/tests/loggers/test_mlflow_logger.py +++ b/tests/loggers/test_mlflow_logger.py @@ -709,6 +709,7 @@ def test_mlflow_ignore_metrics(self, num_batches, device, ignore_metrics, expect logger = MLFlowLogger( tracking_uri=tmp_path / Path('my-test-mlflow-uri'), ignore_metrics=ignore_metrics, + log_duplicated_metric_every_n_steps=0, ) file_path = self.run_trainer(logger, num_batches) @@ -854,52 +855,25 @@ def test_mlflow_logging_with_metrics_dedupping(tmp_path): run_name='test_run', logging_buffer_seconds=2, log_duplicated_metric_every_n_steps=3, - log_duplicated_metric_every_n_millis=10000, ) test_mlflow_logger.init(state=mock_state, logger=mock_logger) - # # Test dedupping of metrics and duplicated metrics get logged per - # # `log_duplicated_metric_every_n_steps` steps. - # steps = 10 - # for i in range(steps): - # # 'foo' always have different values, while 'bar' always have the same value. - # metrics = { - # 'foo': i, - # 'bar': 0, - # } - # test_mlflow_logger.log_metrics(metrics, step=i) - - # if i % 3 == 0: - # # 'bar' will be logged every 3 steps. - # mock_log_metrics.assert_called_with(metrics={'foo': float(i), 'bar': 0.0}, step=i, synchronous=False) - # else: - # # 'bar' will not be logged. - # mock_log_metrics.assert_called_with(metrics={'foo': float(i)}, step=i, synchronous=False) - # Test dedupping of metrics and duplicated metrics get logged per - # `log_duplicated_metric_every_n_millis` milliseconds. - timestamps = [0, 5, 10, 15, 20, 25, 30, 35, 40, 45] - # Reset the metrics cache. - test_mlflow_logger._metrics_cache = {} - with patch('time.time', side_effect=timestamps): - for i in range(len(timestamps)): - # 'foo' always have different values, while 'bar' always have the same value. - metrics = { - 'foo': i, - 'bar': 0, - } - test_mlflow_logger.log_metrics(metrics, step=0) - - if i % 2 == 0: - # 'bar' will be logged every 2 steps. - mock_log_metrics.assert_called_with( - metrics={ - 'foo': float(i), - 'bar': 0.0, - }, step=0, synchronous=False, - ) - else: - # 'bar' will not be logged. - mock_log_metrics.assert_called_with(metrics={'foo': float(i)}, step=0, synchronous=False) + # `log_duplicated_metric_every_n_steps` steps. + steps = 10 + for i in range(steps): + # 'foo' always have different values, while 'bar' always have the same value. + metrics = { + 'foo': i, + 'bar': 0, + } + test_mlflow_logger.log_metrics(metrics, step=i) + + if i % 3 == 0: + # 'bar' will be logged every 3 steps. + mock_log_metrics.assert_called_with(metrics={'foo': float(i), 'bar': 0.0}, step=i, synchronous=False) + else: + # 'bar' will not be logged. + mock_log_metrics.assert_called_with(metrics={'foo': float(i)}, step=i, synchronous=False) test_mlflow_logger.post_close()