From 34984bc372cb5797213993ff28a5fb73f298a588 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A5kon=20Steinkopf=20S=C3=B8hoel?= Date: Tue, 29 Oct 2024 13:58:47 +0100 Subject: [PATCH 01/13] Add top level trace to ert Adds a top level trace to ert. Add a span processor through the add_span_processor pluggin hook to export the trace to e.g. azure --- src/ert/__main__.py | 75 ++++++++++++------- .../plugins/hook_specifications/__init__.py | 6 +- .../plugins/hook_specifications/logging.py | 12 +++ src/ert/plugins/plugin_manager.py | 14 ++++ 4 files changed, 80 insertions(+), 27 deletions(-) diff --git a/src/ert/__main__.py b/src/ert/__main__.py index aab9cdcbbb9..87b151d24d6 100755 --- a/src/ert/__main__.py +++ b/src/ert/__main__.py @@ -14,6 +14,11 @@ from uuid import UUID import yaml +from opentelemetry import trace +from opentelemetry.instrumentation.threading import ThreadingInstrumentor +from opentelemetry.sdk.resources import SERVICE_NAME, Resource +from opentelemetry.sdk.trace import SpanLimits, TracerProvider +from opentelemetry.trace import Status, StatusCode import ert.shared from _ert.threading import set_signal_handler @@ -652,6 +657,12 @@ def main() -> None: # Have ErtThread re-raise uncaught exceptions on main thread set_signal_handler() + ThreadingInstrumentor().instrument() + resource = Resource(attributes={SERVICE_NAME: "ert"}) + tracer_provider = TracerProvider( + resource=resource, span_limits=SpanLimits(max_events=128 * 16) + ) + trace.set_tracer_provider(tracer_provider) args = ert_parser(None, sys.argv[1:]) @@ -677,32 +688,44 @@ def main() -> None: handler.setLevel(logging.INFO) root_logger.addHandler(handler) - try: - with ErtPluginContext(logger=logging.getLogger()) as context: - logger.info(f"Running ert with {args}") - args.func(args, context.plugin_manager) - except ErtCliError as err: - logger.debug(str(err)) - sys.exit(str(err)) - except ConfigValidationError as err: - err_msg = err.cli_message() - logger.debug(err_msg) - sys.exit(err_msg) - except BaseException as err: - logger.exception(f'ERT crashed unexpectedly with "{err}"') - - logfiles = set() # Use set to avoid duplicates... - for loghandler in logging.getLogger().handlers: - if isinstance(loghandler, logging.FileHandler): - logfiles.add(loghandler.baseFilename) - - msg = f'ERT crashed unexpectedly with "{err}".\nSee logfile(s) for details:' - msg += "\n " + "\n ".join(logfiles) - - sys.exit(msg) - finally: - log_process_usage() - os.environ.pop("ERT_LOG_DIR") + with trace.get_tracer("ert.main").start_as_current_span( + "ert.application.start" + ) as span: + try: + with ErtPluginContext( + logger=logging.getLogger(), trace_provider=tracer_provider + ) as context: + logger.info(f"Running ert with {args}") + args.func(args, context.plugin_manager) + except ErtCliError as err: + span.set_status(Status(StatusCode.ERROR)) + span.record_exception(err) + logger.debug(str(err)) + sys.exit(str(err)) + except ConfigValidationError as err: + span.set_status(Status(StatusCode.ERROR)) + span.record_exception(err) + err_msg = err.cli_message() + logger.debug(err_msg) + sys.exit(err_msg) + except BaseException as err: + span.set_status(Status(StatusCode.ERROR)) + span.record_exception(err) + logger.exception(f'ERT crashed unexpectedly with "{err}"') + + logfiles = set() # Use set to avoid duplicates... + for loghandler in logging.getLogger().handlers: + if isinstance(loghandler, logging.FileHandler): + logfiles.add(loghandler.baseFilename) + + msg = f'ERT crashed unexpectedly with "{err}".\nSee logfile(s) for details:' + msg += "\n " + "\n ".join(logfiles) + + sys.exit(msg) + finally: + log_process_usage() + os.environ.pop("ERT_LOG_DIR") + ThreadingInstrumentor().uninstrument() if __name__ == "__main__": diff --git a/src/ert/plugins/hook_specifications/__init__.py b/src/ert/plugins/hook_specifications/__init__.py index 0f76b317564..60f946f9dc5 100644 --- a/src/ert/plugins/hook_specifications/__init__.py +++ b/src/ert/plugins/hook_specifications/__init__.py @@ -13,11 +13,15 @@ job_documentation, legacy_ertscript_workflow, ) -from .logging import add_log_handle_to_root +from .logging import ( + add_log_handle_to_root, + add_span_processor, +) from .site_config import site_config_lines __all__ = [ "add_log_handle_to_root", + "add_span_processor", "ecl100_config_path", "ecl300_config_path", "flow_config_path", diff --git a/src/ert/plugins/hook_specifications/logging.py b/src/ert/plugins/hook_specifications/logging.py index 888c875efae..e8f07bf32ff 100644 --- a/src/ert/plugins/hook_specifications/logging.py +++ b/src/ert/plugins/hook_specifications/logging.py @@ -1,5 +1,7 @@ import logging +from opentelemetry.sdk.trace.export import BatchSpanProcessor + from ert.plugins.plugin_manager import hook_specification @@ -11,3 +13,13 @@ def add_log_handle_to_root() -> logging.Handler: # type: ignore :return: A log handle that will be added to the root logger """ + + +@hook_specification +def add_span_processor() -> BatchSpanProcessor: # type: ignore + """ + Create a BatchSpanProcessor which will be added to the trace provider + in ert. + + :return: A BatchSpanProcessor that will be added to the trace provider in ert + """ diff --git a/src/ert/plugins/plugin_manager.py b/src/ert/plugins/plugin_manager.py index 78e341f260e..1744f63d646 100644 --- a/src/ert/plugins/plugin_manager.py +++ b/src/ert/plugins/plugin_manager.py @@ -26,6 +26,7 @@ ) import pluggy +from opentelemetry.sdk.trace import TracerProvider from .workflow_config import WorkflowConfigs @@ -322,17 +323,26 @@ def add_logging_handle_to_root(self, logger: logging.Logger) -> None: for handle in handles: logger.addHandler(handle) + def add_span_processor_to_trace_provider( + self, trace_provider: TracerProvider + ) -> None: + span_processors = self.hook.add_span_processor() + for span_processor in span_processors: + trace_provider.add_span_processor(span_processor) + class ErtPluginContext: def __init__( self, plugins: Optional[List[object]] = None, logger: Optional[logging.Logger] = None, + trace_provider: Optional[TracerProvider] = None, ) -> None: self.plugin_manager = ErtPluginManager(plugins=plugins) self.tmp_dir: Optional[str] = None self.tmp_site_config_filename: Optional[str] = None self._logger = logger + self._trace_provider = trace_provider def _create_site_config(self, tmp_dir: str) -> Optional[str]: site_config_content = self.plugin_manager.get_site_config_content() @@ -348,6 +358,10 @@ def _create_site_config(self, tmp_dir: str) -> Optional[str]: def __enter__(self) -> ErtPluginContext: if self._logger is not None: self.plugin_manager.add_logging_handle_to_root(logger=self._logger) + if self._trace_provider is not None: + self.plugin_manager.add_span_processor_to_trace_provider( + trace_provider=self._trace_provider + ) logger.debug(str(self.plugin_manager)) logger.debug("Creating temporary directory for site-config") self.tmp_dir = tempfile.mkdtemp() From d10a52a150c36567fb72e5ee60c9bee32b6b4b72 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A5kon=20Steinkopf=20S=C3=B8hoel?= Date: Wed, 30 Oct 2024 07:45:38 +0100 Subject: [PATCH 02/13] Add missing dependency --- pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyproject.toml b/pyproject.toml index b344c88d1ec..cb508f2f76d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -50,6 +50,7 @@ dependencies = [ "netCDF4", "numpy<2", "openpyxl", # extra dependency for pandas (excel) + "opentelemetry-instrumentation-threading", "orjson", "packaging", "pandas", From 2c58ebd3c0bffeac9a0b29f2f4e3d0818b73352b Mon Sep 17 00:00:00 2001 From: Andreas Eknes Lie Date: Wed, 30 Oct 2024 09:15:05 +0100 Subject: [PATCH 03/13] Add opentelemetry api & sdk dependencies --- pyproject.toml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pyproject.toml b/pyproject.toml index cb508f2f76d..0431af2a926 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -50,6 +50,8 @@ dependencies = [ "netCDF4", "numpy<2", "openpyxl", # extra dependency for pandas (excel) + "opentelemetry-api", + "opentelemetry-sdk", "opentelemetry-instrumentation-threading", "orjson", "packaging", From 0a5b658c22096e5f8a03900600e3359e68407b80 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A5kon=20Steinkopf=20S=C3=B8hoel?= Date: Wed, 30 Oct 2024 10:19:29 +0100 Subject: [PATCH 04/13] Add span for run model and run ensemble --- src/ert/run_models/base_run_model.py | 128 +++++++++++++++++---------- 1 file changed, 83 insertions(+), 45 deletions(-) diff --git a/src/ert/run_models/base_run_model.py b/src/ert/run_models/base_run_model.py index fb7b0757462..8f605e84eb5 100644 --- a/src/ert/run_models/base_run_model.py +++ b/src/ert/run_models/base_run_model.py @@ -25,6 +25,8 @@ ) import numpy as np +from opentelemetry import trace +from opentelemetry.trace import Status, StatusCode from _ert.events import ( EESnapshot, @@ -323,51 +325,85 @@ def start_simulations_thread( evaluator_server_config: EvaluatorServerConfig, restart: bool = False, ) -> None: - failed = False - exception: Optional[Exception] = None - error_messages: MutableSequence[str] = [] - try: - self.start_time = int(time.time()) - self.stop_time = None - with captured_logs(error_messages): - self._set_default_env_context() - self.run_experiment( - evaluator_server_config=evaluator_server_config, - restart=restart, + tracer = trace.get_tracer("ert.main") + with tracer.start_as_current_span("ert.run_model.start") as span: + failed = False + exception: Optional[Exception] = None + error_messages: MutableSequence[str] = [] + try: + span.add_event( + "log", + { + "log.severity": "info", + "log.message": f"Starting simulation thread {self.__class__.__name__}", + }, ) - if self._completed_realizations_mask: - combined = np.logical_or( - np.array(self._completed_realizations_mask), - np.array(self.active_realizations), + self.start_time = int(time.time()) + self.stop_time = None + with captured_logs(error_messages): + self._set_default_env_context() + self.run_experiment( + evaluator_server_config=evaluator_server_config, + restart=restart, ) - self._completed_realizations_mask = list(combined) - else: - self._completed_realizations_mask = copy.copy( - self.active_realizations + if self._completed_realizations_mask: + combined = np.logical_or( + np.array(self._completed_realizations_mask), + np.array(self.active_realizations), + ) + self._completed_realizations_mask = list(combined) + else: + self._completed_realizations_mask = copy.copy( + self.active_realizations + ) + except ErtRunError as e: + span.set_status(Status(StatusCode.ERROR)) + span.record_exception(e) + span.add_event( + "log", + { + "log.severity": "exception", + "log.message": f'Simulation ended with error "{e}"', + }, + ) + self._completed_realizations_mask = [] + failed = True + exception = e + except UserWarning as e: + span.record_exception(e) + span.add_event( + "log", + { + "log.severity": "exception", + "log.message": f'Simulation ended with warning "{e}"', + }, + ) + except Exception as e: + span.set_status(Status(StatusCode.ERROR)) + span.record_exception(e) + span.add_event( + "log", + { + "log.severity": "exception", + "log.message": f'Simulation ended with error "{e}"', + }, + ) + failed = True + exception = e + finally: + self._clean_env_context() + self.stop_time = int(time.time()) + + self.send_event( + EndEvent( + failed=failed, + msg=( + self.format_error(exception, error_messages) + if failed + else "Experiment completed." + ), ) - except ErtRunError as e: - self._completed_realizations_mask = [] - failed = True - exception = e - except UserWarning: - pass - except Exception as e: - failed = True - exception = e - finally: - self._clean_env_context() - self.stop_time = int(time.time()) - - self.send_event( - EndEvent( - failed=failed, - msg=( - self.format_error(exception, error_messages) - if failed - else "Experiment completed." - ), ) - ) @abstractmethod def run_experiment( @@ -579,10 +615,12 @@ def run_ensemble_evaluator( ensemble: Ensemble, ee_config: EvaluatorServerConfig, ) -> List[int]: - successful_realizations = asyncio.run( - self.run_ensemble_evaluator_async(run_args, ensemble, ee_config) - ) - return successful_realizations + tracer = trace.get_tracer("ert.main") + with tracer.start_as_current_span("ert.run_model.run_ensemble"): + successful_realizations = asyncio.run( + self.run_ensemble_evaluator_async(run_args, ensemble, ee_config) + ) + return successful_realizations def _build_ensemble( self, From b55ca1d9c69140237dba1fee88b2bc8ee3326ab4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A5kon=20Steinkopf=20S=C3=B8hoel?= Date: Wed, 30 Oct 2024 10:41:37 +0100 Subject: [PATCH 05/13] Set current span with decorator Set current span with deocrator instead of with statements and remove span.add_event functions as these are redundant to already existing logging --- src/ert/run_models/base_run_model.py | 130 ++++++++++----------------- 1 file changed, 48 insertions(+), 82 deletions(-) diff --git a/src/ert/run_models/base_run_model.py b/src/ert/run_models/base_run_model.py index 8f605e84eb5..f77a12e91ca 100644 --- a/src/ert/run_models/base_run_model.py +++ b/src/ert/run_models/base_run_model.py @@ -26,7 +26,6 @@ import numpy as np from opentelemetry import trace -from opentelemetry.trace import Status, StatusCode from _ert.events import ( EESnapshot, @@ -85,6 +84,7 @@ RunModelUpdateEndEvent, ) +tracer = trace.get_tracer("ert.main") logger = logging.getLogger(__name__) if TYPE_CHECKING: @@ -320,90 +320,57 @@ def _clean_env_context(self) -> None: self._context_env.pop(key) os.environ.pop(key, None) + @tracer.start_as_current_span("ert.run_model.start") def start_simulations_thread( self, evaluator_server_config: EvaluatorServerConfig, restart: bool = False, ) -> None: - tracer = trace.get_tracer("ert.main") - with tracer.start_as_current_span("ert.run_model.start") as span: - failed = False - exception: Optional[Exception] = None - error_messages: MutableSequence[str] = [] - try: - span.add_event( - "log", - { - "log.severity": "info", - "log.message": f"Starting simulation thread {self.__class__.__name__}", - }, + failed = False + exception: Optional[Exception] = None + error_messages: MutableSequence[str] = [] + try: + self.start_time = int(time.time()) + self.stop_time = None + with captured_logs(error_messages): + self._set_default_env_context() + self.run_experiment( + evaluator_server_config=evaluator_server_config, + restart=restart, ) - self.start_time = int(time.time()) - self.stop_time = None - with captured_logs(error_messages): - self._set_default_env_context() - self.run_experiment( - evaluator_server_config=evaluator_server_config, - restart=restart, + if self._completed_realizations_mask: + combined = np.logical_or( + np.array(self._completed_realizations_mask), + np.array(self.active_realizations), ) - if self._completed_realizations_mask: - combined = np.logical_or( - np.array(self._completed_realizations_mask), - np.array(self.active_realizations), - ) - self._completed_realizations_mask = list(combined) - else: - self._completed_realizations_mask = copy.copy( - self.active_realizations - ) - except ErtRunError as e: - span.set_status(Status(StatusCode.ERROR)) - span.record_exception(e) - span.add_event( - "log", - { - "log.severity": "exception", - "log.message": f'Simulation ended with error "{e}"', - }, - ) - self._completed_realizations_mask = [] - failed = True - exception = e - except UserWarning as e: - span.record_exception(e) - span.add_event( - "log", - { - "log.severity": "exception", - "log.message": f'Simulation ended with warning "{e}"', - }, - ) - except Exception as e: - span.set_status(Status(StatusCode.ERROR)) - span.record_exception(e) - span.add_event( - "log", - { - "log.severity": "exception", - "log.message": f'Simulation ended with error "{e}"', - }, - ) - failed = True - exception = e - finally: - self._clean_env_context() - self.stop_time = int(time.time()) - - self.send_event( - EndEvent( - failed=failed, - msg=( - self.format_error(exception, error_messages) - if failed - else "Experiment completed." - ), + self._completed_realizations_mask = list(combined) + else: + self._completed_realizations_mask = copy.copy( + self.active_realizations ) + except ErtRunError as e: + self._completed_realizations_mask = [] + failed = True + exception = e + except UserWarning: + pass + except Exception as e: + failed = True + exception = e + finally: + self._clean_env_context() + self.stop_time = int(time.time()) + + self.send_event( + EndEvent( + failed=failed, + msg=( + self.format_error(exception, error_messages) + if failed + else "Experiment completed." + ), ) + ) @abstractmethod def run_experiment( @@ -609,18 +576,17 @@ async def run_ensemble_evaluator_async( return evaluator_task.result() # This function needs to be there for the sake of testing that expects sync ee run + @tracer.start_as_current_span("ert.run_model.run_ensemble") def run_ensemble_evaluator( self, run_args: List[RunArg], ensemble: Ensemble, ee_config: EvaluatorServerConfig, ) -> List[int]: - tracer = trace.get_tracer("ert.main") - with tracer.start_as_current_span("ert.run_model.run_ensemble"): - successful_realizations = asyncio.run( - self.run_ensemble_evaluator_async(run_args, ensemble, ee_config) - ) - return successful_realizations + successful_realizations = asyncio.run( + self.run_ensemble_evaluator_async(run_args, ensemble, ee_config) + ) + return successful_realizations def _build_ensemble( self, From 41fb8ef1b27628e526f418704e4f2069d0843906 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A5kon=20Steinkopf=20S=C3=B8hoel?= Date: Wed, 30 Oct 2024 12:15:47 +0100 Subject: [PATCH 06/13] Extract ert.main tracer setup to own module --- src/ert/__main__.py | 13 ++----------- src/ert/run_models/base_run_model.py | 3 +-- src/ert/trace.py | 11 +++++++++++ 3 files changed, 14 insertions(+), 13 deletions(-) create mode 100644 src/ert/trace.py diff --git a/src/ert/__main__.py b/src/ert/__main__.py index 87b151d24d6..ddb9ef3f6b3 100755 --- a/src/ert/__main__.py +++ b/src/ert/__main__.py @@ -14,10 +14,7 @@ from uuid import UUID import yaml -from opentelemetry import trace from opentelemetry.instrumentation.threading import ThreadingInstrumentor -from opentelemetry.sdk.resources import SERVICE_NAME, Resource -from opentelemetry.sdk.trace import SpanLimits, TracerProvider from opentelemetry.trace import Status, StatusCode import ert.shared @@ -38,6 +35,7 @@ from ert.run_models.multiple_data_assimilation import MultipleDataAssimilation from ert.services import StorageService, WebvizErt from ert.shared.storage.command import add_parser_options as ert_api_add_parser_options +from ert.trace import tracer, tracer_provider from ert.validation import ( IntegerArgument, NumberListStringArgument, @@ -658,11 +656,6 @@ def main() -> None: # Have ErtThread re-raise uncaught exceptions on main thread set_signal_handler() ThreadingInstrumentor().instrument() - resource = Resource(attributes={SERVICE_NAME: "ert"}) - tracer_provider = TracerProvider( - resource=resource, span_limits=SpanLimits(max_events=128 * 16) - ) - trace.set_tracer_provider(tracer_provider) args = ert_parser(None, sys.argv[1:]) @@ -688,9 +681,7 @@ def main() -> None: handler.setLevel(logging.INFO) root_logger.addHandler(handler) - with trace.get_tracer("ert.main").start_as_current_span( - "ert.application.start" - ) as span: + with tracer.start_as_current_span("ert.application.start") as span: try: with ErtPluginContext( logger=logging.getLogger(), trace_provider=tracer_provider diff --git a/src/ert/run_models/base_run_model.py b/src/ert/run_models/base_run_model.py index f77a12e91ca..3300585dcf4 100644 --- a/src/ert/run_models/base_run_model.py +++ b/src/ert/run_models/base_run_model.py @@ -25,7 +25,6 @@ ) import numpy as np -from opentelemetry import trace from _ert.events import ( EESnapshot, @@ -71,6 +70,7 @@ from ert.mode_definitions import MODULE_MODE from ert.runpaths import Runpaths from ert.storage import Ensemble, Storage +from ert.trace import tracer from ert.workflow_runner import WorkflowRunner from ..config.analysis_config import UpdateSettings @@ -84,7 +84,6 @@ RunModelUpdateEndEvent, ) -tracer = trace.get_tracer("ert.main") logger = logging.getLogger(__name__) if TYPE_CHECKING: diff --git a/src/ert/trace.py b/src/ert/trace.py new file mode 100644 index 00000000000..b68477b6022 --- /dev/null +++ b/src/ert/trace.py @@ -0,0 +1,11 @@ +from opentelemetry import trace +from opentelemetry.sdk.resources import SERVICE_NAME, Resource +from opentelemetry.sdk.trace import SpanLimits, TracerProvider + +resource = Resource(attributes={SERVICE_NAME: "ert"}) +tracer_provider = TracerProvider( + resource=resource, span_limits=SpanLimits(max_events=128 * 16) +) +trace.set_tracer_provider(tracer_provider) + +tracer = trace.get_tracer("ert.main") From e1970bbbfdbb8f96aa7aa806a6efb7847bf24fca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A5kon=20Steinkopf=20S=C3=B8hoel?= Date: Wed, 30 Oct 2024 13:18:27 +0100 Subject: [PATCH 07/13] Set current span with decorator --- src/ert/__main__.py | 76 ++++++++++++++++++++++----------------------- 1 file changed, 38 insertions(+), 38 deletions(-) diff --git a/src/ert/__main__.py b/src/ert/__main__.py index ddb9ef3f6b3..d46c8f41a09 100755 --- a/src/ert/__main__.py +++ b/src/ert/__main__.py @@ -35,7 +35,7 @@ from ert.run_models.multiple_data_assimilation import MultipleDataAssimilation from ert.services import StorageService, WebvizErt from ert.shared.storage.command import add_parser_options as ert_api_add_parser_options -from ert.trace import tracer, tracer_provider +from ert.trace import trace, tracer, tracer_provider from ert.validation import ( IntegerArgument, NumberListStringArgument, @@ -649,7 +649,9 @@ def log_process_usage() -> None: ) +@tracer.start_as_current_span("ert.application.start") def main() -> None: + span = trace.get_current_span() warnings.filterwarnings("ignore", category=DeprecationWarning) locale.setlocale(locale.LC_NUMERIC, "C") @@ -680,43 +682,41 @@ def main() -> None: handler = logging.StreamHandler(sys.stdout) handler.setLevel(logging.INFO) root_logger.addHandler(handler) - - with tracer.start_as_current_span("ert.application.start") as span: - try: - with ErtPluginContext( - logger=logging.getLogger(), trace_provider=tracer_provider - ) as context: - logger.info(f"Running ert with {args}") - args.func(args, context.plugin_manager) - except ErtCliError as err: - span.set_status(Status(StatusCode.ERROR)) - span.record_exception(err) - logger.debug(str(err)) - sys.exit(str(err)) - except ConfigValidationError as err: - span.set_status(Status(StatusCode.ERROR)) - span.record_exception(err) - err_msg = err.cli_message() - logger.debug(err_msg) - sys.exit(err_msg) - except BaseException as err: - span.set_status(Status(StatusCode.ERROR)) - span.record_exception(err) - logger.exception(f'ERT crashed unexpectedly with "{err}"') - - logfiles = set() # Use set to avoid duplicates... - for loghandler in logging.getLogger().handlers: - if isinstance(loghandler, logging.FileHandler): - logfiles.add(loghandler.baseFilename) - - msg = f'ERT crashed unexpectedly with "{err}".\nSee logfile(s) for details:' - msg += "\n " + "\n ".join(logfiles) - - sys.exit(msg) - finally: - log_process_usage() - os.environ.pop("ERT_LOG_DIR") - ThreadingInstrumentor().uninstrument() + try: + with ErtPluginContext( + logger=logging.getLogger(), trace_provider=tracer_provider + ) as context: + logger.info(f"Running ert with {args}") + args.func(args, context.plugin_manager) + except ErtCliError as err: + span.set_status(Status(StatusCode.ERROR)) + span.record_exception(err) + logger.debug(str(err)) + sys.exit(str(err)) + except ConfigValidationError as err: + span.set_status(Status(StatusCode.ERROR)) + span.record_exception(err) + err_msg = err.cli_message() + logger.debug(err_msg) + sys.exit(err_msg) + except BaseException as err: + span.set_status(Status(StatusCode.ERROR)) + span.record_exception(err) + logger.exception(f'ERT crashed unexpectedly with "{err}"') + + logfiles = set() # Use set to avoid duplicates... + for loghandler in logging.getLogger().handlers: + if isinstance(loghandler, logging.FileHandler): + logfiles.add(loghandler.baseFilename) + + msg = f'ERT crashed unexpectedly with "{err}".\nSee logfile(s) for details:' + msg += "\n " + "\n ".join(logfiles) + + sys.exit(msg) + finally: + log_process_usage() + os.environ.pop("ERT_LOG_DIR") + ThreadingInstrumentor().uninstrument() if __name__ == "__main__": From f8b265c91391bb64258b623b7c520bf1947e01a9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A5kon=20Steinkopf=20S=C3=B8hoel?= Date: Thu, 31 Oct 2024 08:12:59 +0100 Subject: [PATCH 08/13] Add span for run_experiment --- src/ert/run_models/ensemble_experiment.py | 2 ++ src/ert/run_models/ensemble_smoother.py | 2 ++ src/ert/run_models/evaluate_ensemble.py | 2 ++ src/ert/run_models/iterated_ensemble_smoother.py | 2 ++ src/ert/run_models/multiple_data_assimilation.py | 2 ++ 5 files changed, 10 insertions(+) diff --git a/src/ert/run_models/ensemble_experiment.py b/src/ert/run_models/ensemble_experiment.py index 25348477b8f..2627cfb15c8 100644 --- a/src/ert/run_models/ensemble_experiment.py +++ b/src/ert/run_models/ensemble_experiment.py @@ -9,6 +9,7 @@ from ert.enkf_main import sample_prior from ert.ensemble_evaluator import EvaluatorServerConfig from ert.storage import Ensemble, Experiment, Storage +from ert.trace import tracer from ..run_arg import create_run_arguments from .base_run_model import BaseRunModel, StatusEvents @@ -55,6 +56,7 @@ def __init__( minimum_required_realizations=minimum_required_realizations, ) + @tracer.start_as_current_span(f"ert.{__name__}.run_experiment") def run_experiment( self, evaluator_server_config: EvaluatorServerConfig, diff --git a/src/ert/run_models/ensemble_smoother.py b/src/ert/run_models/ensemble_smoother.py index 547efdfcd3e..550a5e25028 100644 --- a/src/ert/run_models/ensemble_smoother.py +++ b/src/ert/run_models/ensemble_smoother.py @@ -10,6 +10,7 @@ from ert.enkf_main import sample_prior from ert.ensemble_evaluator import EvaluatorServerConfig from ert.storage import Storage +from ert.trace import tracer from ..config.analysis_config import UpdateSettings from ..config.analysis_module import ESSettings @@ -56,6 +57,7 @@ def __init__( self.support_restart = False + @tracer.start_as_current_span(f"ert.{__name__}.run_experiment") def run_experiment( self, evaluator_server_config: EvaluatorServerConfig, restart: bool = False ) -> None: diff --git a/src/ert/run_models/evaluate_ensemble.py b/src/ert/run_models/evaluate_ensemble.py index 00737982bc1..74a4175f18e 100644 --- a/src/ert/run_models/evaluate_ensemble.py +++ b/src/ert/run_models/evaluate_ensemble.py @@ -8,6 +8,7 @@ from ert.ensemble_evaluator import EvaluatorServerConfig from ert.storage import Storage +from ert.trace import tracer from ..run_arg import create_run_arguments from . import BaseRunModel @@ -58,6 +59,7 @@ def __init__( random_seed=random_seed, ) + @tracer.start_as_current_span(f"ert.{__name__}.run_experiment") def run_experiment( self, evaluator_server_config: EvaluatorServerConfig, restart: bool = False ) -> None: diff --git a/src/ert/run_models/iterated_ensemble_smoother.py b/src/ert/run_models/iterated_ensemble_smoother.py index 2a674e062fd..0a1daaf2fb9 100644 --- a/src/ert/run_models/iterated_ensemble_smoother.py +++ b/src/ert/run_models/iterated_ensemble_smoother.py @@ -13,6 +13,7 @@ from ert.enkf_main import sample_prior from ert.ensemble_evaluator import EvaluatorServerConfig from ert.storage import Ensemble, Storage +from ert.trace import tracer from ..config.analysis_config import UpdateSettings from ..config.analysis_module import IESSettings @@ -117,6 +118,7 @@ def analyzeStep( ) from e self.run_workflows(HookRuntime.POST_UPDATE, self._storage, posterior_storage) + @tracer.start_as_current_span(f"ert.{__name__}.run_experiment") def run_experiment( self, evaluator_server_config: EvaluatorServerConfig, restart: bool = False ) -> None: diff --git a/src/ert/run_models/multiple_data_assimilation.py b/src/ert/run_models/multiple_data_assimilation.py index eb394b852b6..6466dce9eab 100644 --- a/src/ert/run_models/multiple_data_assimilation.py +++ b/src/ert/run_models/multiple_data_assimilation.py @@ -11,6 +11,7 @@ from ert.enkf_main import sample_prior from ert.ensemble_evaluator import EvaluatorServerConfig from ert.storage import Ensemble, Storage +from ert.trace import tracer from ..config.analysis_config import UpdateSettings from ..config.analysis_module import ESSettings @@ -79,6 +80,7 @@ def __init__( minimum_required_realizations=minimum_required_realizations, ) + @tracer.start_as_current_span(f"ert.{__name__}.run_experiment") def run_experiment( self, evaluator_server_config: EvaluatorServerConfig, restart: bool = False ) -> None: From 28ad85a5b5b42d4edc5847a15580af1c448bd1c7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A5kon=20Steinkopf=20S=C3=B8hoel?= Date: Thu, 31 Oct 2024 08:13:13 +0100 Subject: [PATCH 09/13] Add span for run_workflows --- src/ert/run_models/base_run_model.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/ert/run_models/base_run_model.py b/src/ert/run_models/base_run_model.py index 3300585dcf4..efcc9383fb0 100644 --- a/src/ert/run_models/base_run_model.py +++ b/src/ert/run_models/base_run_model.py @@ -657,6 +657,7 @@ def validate(self) -> None: f"({min_realization_count})" ) + @tracer.start_as_current_span("ert.run_model.run_workflows") def run_workflows( self, runtime: HookRuntime, storage: Storage, ensemble: Ensemble ) -> None: From 2d2132bff374d0a7aae6bd53c364ec10ff13cac3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A5kon=20Steinkopf=20S=C3=B8hoel?= Date: Thu, 31 Oct 2024 09:24:15 +0100 Subject: [PATCH 10/13] Align span names with logger name --- src/ert/run_models/base_run_model.py | 6 +++--- src/ert/run_models/ensemble_experiment.py | 2 +- src/ert/run_models/ensemble_smoother.py | 2 +- src/ert/run_models/evaluate_ensemble.py | 2 +- src/ert/run_models/iterated_ensemble_smoother.py | 2 +- src/ert/run_models/multiple_data_assimilation.py | 2 +- 6 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/ert/run_models/base_run_model.py b/src/ert/run_models/base_run_model.py index efcc9383fb0..5e1dfed32fc 100644 --- a/src/ert/run_models/base_run_model.py +++ b/src/ert/run_models/base_run_model.py @@ -319,7 +319,7 @@ def _clean_env_context(self) -> None: self._context_env.pop(key) os.environ.pop(key, None) - @tracer.start_as_current_span("ert.run_model.start") + @tracer.start_as_current_span(f"{__name__}.start_simulations_thread") def start_simulations_thread( self, evaluator_server_config: EvaluatorServerConfig, @@ -575,7 +575,7 @@ async def run_ensemble_evaluator_async( return evaluator_task.result() # This function needs to be there for the sake of testing that expects sync ee run - @tracer.start_as_current_span("ert.run_model.run_ensemble") + @tracer.start_as_current_span(f"{__name__}.run_ensemble_evaluator") def run_ensemble_evaluator( self, run_args: List[RunArg], @@ -657,7 +657,7 @@ def validate(self) -> None: f"({min_realization_count})" ) - @tracer.start_as_current_span("ert.run_model.run_workflows") + @tracer.start_as_current_span(f"{__name__}.run_workflows") def run_workflows( self, runtime: HookRuntime, storage: Storage, ensemble: Ensemble ) -> None: diff --git a/src/ert/run_models/ensemble_experiment.py b/src/ert/run_models/ensemble_experiment.py index 2627cfb15c8..13d8a3a8411 100644 --- a/src/ert/run_models/ensemble_experiment.py +++ b/src/ert/run_models/ensemble_experiment.py @@ -56,7 +56,7 @@ def __init__( minimum_required_realizations=minimum_required_realizations, ) - @tracer.start_as_current_span(f"ert.{__name__}.run_experiment") + @tracer.start_as_current_span(f"{__name__}.run_experiment") def run_experiment( self, evaluator_server_config: EvaluatorServerConfig, diff --git a/src/ert/run_models/ensemble_smoother.py b/src/ert/run_models/ensemble_smoother.py index 550a5e25028..b745b2df0a5 100644 --- a/src/ert/run_models/ensemble_smoother.py +++ b/src/ert/run_models/ensemble_smoother.py @@ -57,7 +57,7 @@ def __init__( self.support_restart = False - @tracer.start_as_current_span(f"ert.{__name__}.run_experiment") + @tracer.start_as_current_span(f"{__name__}.run_experiment") def run_experiment( self, evaluator_server_config: EvaluatorServerConfig, restart: bool = False ) -> None: diff --git a/src/ert/run_models/evaluate_ensemble.py b/src/ert/run_models/evaluate_ensemble.py index 74a4175f18e..aa67f42b07c 100644 --- a/src/ert/run_models/evaluate_ensemble.py +++ b/src/ert/run_models/evaluate_ensemble.py @@ -59,7 +59,7 @@ def __init__( random_seed=random_seed, ) - @tracer.start_as_current_span(f"ert.{__name__}.run_experiment") + @tracer.start_as_current_span(f"{__name__}.run_experiment") def run_experiment( self, evaluator_server_config: EvaluatorServerConfig, restart: bool = False ) -> None: diff --git a/src/ert/run_models/iterated_ensemble_smoother.py b/src/ert/run_models/iterated_ensemble_smoother.py index 0a1daaf2fb9..db03d299390 100644 --- a/src/ert/run_models/iterated_ensemble_smoother.py +++ b/src/ert/run_models/iterated_ensemble_smoother.py @@ -118,7 +118,7 @@ def analyzeStep( ) from e self.run_workflows(HookRuntime.POST_UPDATE, self._storage, posterior_storage) - @tracer.start_as_current_span(f"ert.{__name__}.run_experiment") + @tracer.start_as_current_span(f"{__name__}.run_experiment") def run_experiment( self, evaluator_server_config: EvaluatorServerConfig, restart: bool = False ) -> None: diff --git a/src/ert/run_models/multiple_data_assimilation.py b/src/ert/run_models/multiple_data_assimilation.py index 6466dce9eab..ac30649c6dc 100644 --- a/src/ert/run_models/multiple_data_assimilation.py +++ b/src/ert/run_models/multiple_data_assimilation.py @@ -80,7 +80,7 @@ def __init__( minimum_required_realizations=minimum_required_realizations, ) - @tracer.start_as_current_span(f"ert.{__name__}.run_experiment") + @tracer.start_as_current_span(f"{__name__}.run_experiment") def run_experiment( self, evaluator_server_config: EvaluatorServerConfig, restart: bool = False ) -> None: From 3e24f50e923a89994fcc0e09343e20a46cae915e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A5kon=20Steinkopf=20S=C3=B8hoel?= Date: Thu, 31 Oct 2024 10:27:12 +0100 Subject: [PATCH 11/13] Add span for realizations --- src/ert/scheduler/job.py | 50 +++++++++++++++++++++------------------- 1 file changed, 26 insertions(+), 24 deletions(-) diff --git a/src/ert/scheduler/job.py b/src/ert/scheduler/job.py index 74767aea562..2d817de6efa 100644 --- a/src/ert/scheduler/job.py +++ b/src/ert/scheduler/job.py @@ -17,6 +17,7 @@ from ert.constant_filenames import ERROR_file from ert.load_status import LoadStatus from ert.storage.realization_storage_state import RealizationStorageState +from ert.trace import tracer from .driver import Driver, FailedSubmit @@ -147,30 +148,31 @@ async def run( checksum_lock: asyncio.Lock, max_submit: int = 1, ) -> None: - self._requested_max_submit = max_submit - for attempt in range(max_submit): - await self._submit_and_run_once(sem) - - if self.returncode.cancelled() or self._scheduler._cancelled: - break - - if self.returncode.result() == 0: - if self._scheduler._manifest_queue is not None: - await self._verify_checksum(checksum_lock) - async with forward_model_ok_lock: - await self._handle_finished_forward_model() - break - - if attempt < max_submit - 1: - message = ( - f"Realization {self.iens} failed, " - f"resubmitting for attempt {attempt+2} of {max_submit}" - ) - logger.warning(message) - self.returncode = asyncio.Future() - self.started.clear() - else: - await self._send(JobState.FAILED) + with tracer.start_as_current_span(f"{__name__}.run.realization_{self.iens}"): + self._requested_max_submit = max_submit + for attempt in range(max_submit): + await self._submit_and_run_once(sem) + + if self.returncode.cancelled() or self._scheduler._cancelled: + break + + if self.returncode.result() == 0: + if self._scheduler._manifest_queue is not None: + await self._verify_checksum(checksum_lock) + async with forward_model_ok_lock: + await self._handle_finished_forward_model() + break + + if attempt < max_submit - 1: + message = ( + f"Realization {self.iens} failed, " + f"resubmitting for attempt {attempt+2} of {max_submit}" + ) + logger.warning(message) + self.returncode = asyncio.Future() + self.started.clear() + else: + await self._send(JobState.FAILED) async def _max_runtime_task(self) -> None: assert self.real.max_runtime is not None From 6b7c08db41387d5a31a70bb4805b10cba03d5307 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A5kon=20Steinkopf=20S=C3=B8hoel?= Date: Thu, 31 Oct 2024 12:58:14 +0100 Subject: [PATCH 12/13] Add test for add_span_processor hook --- tests/ert/unit_tests/plugins/dummy_plugins.py | 11 +++++++++++ .../ert/unit_tests/plugins/test_plugin_manager.py | 15 +++++++++++++++ 2 files changed, 26 insertions(+) diff --git a/tests/ert/unit_tests/plugins/dummy_plugins.py b/tests/ert/unit_tests/plugins/dummy_plugins.py index 7eda3bcc899..fa1c7536654 100644 --- a/tests/ert/unit_tests/plugins/dummy_plugins.py +++ b/tests/ert/unit_tests/plugins/dummy_plugins.py @@ -1,4 +1,7 @@ import logging +from io import StringIO + +from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter from ert import ForwardModelStepPlugin, plugin @@ -76,6 +79,14 @@ def add_log_handle_to_root(): return fh +span_output = StringIO() + + +@plugin(name="dummy") +def add_span_processor(): + return BatchSpanProcessor(ConsoleSpanExporter(out=span_output)) + + class DummyFMStep(ForwardModelStepPlugin): def __init__(self): super().__init__(name="DummyForwardModel", command=["touch", "dummy.out"]) diff --git a/tests/ert/unit_tests/plugins/test_plugin_manager.py b/tests/ert/unit_tests/plugins/test_plugin_manager.py index 3c000cf33a8..679278635ca 100644 --- a/tests/ert/unit_tests/plugins/test_plugin_manager.py +++ b/tests/ert/unit_tests/plugins/test_plugin_manager.py @@ -1,7 +1,10 @@ +import json import logging import tempfile from unittest.mock import Mock +from opentelemetry.sdk.trace import TracerProvider + import ert.plugins.hook_implementations from ert.plugins import ErtPluginManager from tests.ert.unit_tests.plugins import dummy_plugins @@ -115,6 +118,18 @@ def test_add_logging_handle(tmpdir): assert "I should write this to spam.log" in result +def test_add_span_processor(): + pm = ErtPluginManager(plugins=[dummy_plugins]) + tracer_provider = TracerProvider() + tracer = tracer_provider.get_tracer("ert.tests") + pm.add_span_processor_to_trace_provider(tracer_provider) + with tracer.start_as_current_span("test_span"): + print("test_span") + tracer_provider.force_flush() + span_info = json.loads(dummy_plugins.span_output.getvalue()) + assert span_info["name"] == "test_span" + + def test_that_forward_model_step_is_registered(tmpdir): with tmpdir.as_cwd(): pm = ErtPluginManager(plugins=[dummy_plugins]) From 4c0127f7572d025d27ddc3c7edef43de95ef4fa9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A5kon=20Steinkopf=20S=C3=B8hoel?= Date: Fri, 1 Nov 2024 09:19:45 +0100 Subject: [PATCH 13/13] fixup! Add test for add_span_processor hook --- tests/ert/unit_tests/plugins/test_plugin_manager.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/tests/ert/unit_tests/plugins/test_plugin_manager.py b/tests/ert/unit_tests/plugins/test_plugin_manager.py index 679278635ca..18a9bdcdbd2 100644 --- a/tests/ert/unit_tests/plugins/test_plugin_manager.py +++ b/tests/ert/unit_tests/plugins/test_plugin_manager.py @@ -123,11 +123,15 @@ def test_add_span_processor(): tracer_provider = TracerProvider() tracer = tracer_provider.get_tracer("ert.tests") pm.add_span_processor_to_trace_provider(tracer_provider) - with tracer.start_as_current_span("test_span"): - print("test_span") + with tracer.start_as_current_span("span_1"): + print("do_something") + with tracer.start_as_current_span("span_2"): + print("do_something_else") tracer_provider.force_flush() - span_info = json.loads(dummy_plugins.span_output.getvalue()) - assert span_info["name"] == "test_span" + span_info = "[" + dummy_plugins.span_output.getvalue().replace("}\n{", "},{") + "]" + span_info = json.loads(span_info) + span_info = {span["name"]: span for span in span_info} + assert span_info["span_2"]["parent_id"] == span_info["span_1"]["context"]["span_id"] def test_that_forward_model_step_is_registered(tmpdir):