diff --git a/pyproject.toml b/pyproject.toml index b344c88d1ec..0431af2a926 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -50,6 +50,9 @@ dependencies = [ "netCDF4", "numpy<2", "openpyxl", # extra dependency for pandas (excel) + "opentelemetry-api", + "opentelemetry-sdk", + "opentelemetry-instrumentation-threading", "orjson", "packaging", "pandas", diff --git a/src/ert/__main__.py b/src/ert/__main__.py index aab9cdcbbb9..d46c8f41a09 100755 --- a/src/ert/__main__.py +++ b/src/ert/__main__.py @@ -14,6 +14,8 @@ from uuid import UUID import yaml +from opentelemetry.instrumentation.threading import ThreadingInstrumentor +from opentelemetry.trace import Status, StatusCode import ert.shared from _ert.threading import set_signal_handler @@ -33,6 +35,7 @@ from ert.run_models.multiple_data_assimilation import MultipleDataAssimilation from ert.services import StorageService, WebvizErt from ert.shared.storage.command import add_parser_options as ert_api_add_parser_options +from ert.trace import trace, tracer, tracer_provider from ert.validation import ( IntegerArgument, NumberListStringArgument, @@ -646,12 +649,15 @@ def log_process_usage() -> None: ) +@tracer.start_as_current_span("ert.application.start") def main() -> None: + span = trace.get_current_span() warnings.filterwarnings("ignore", category=DeprecationWarning) locale.setlocale(locale.LC_NUMERIC, "C") # Have ErtThread re-raise uncaught exceptions on main thread set_signal_handler() + ThreadingInstrumentor().instrument() args = ert_parser(None, sys.argv[1:]) @@ -676,19 +682,26 @@ def main() -> None: handler = logging.StreamHandler(sys.stdout) handler.setLevel(logging.INFO) root_logger.addHandler(handler) - try: - with ErtPluginContext(logger=logging.getLogger()) as context: + with ErtPluginContext( + logger=logging.getLogger(), trace_provider=tracer_provider + ) as context: logger.info(f"Running ert with {args}") args.func(args, context.plugin_manager) except ErtCliError as err: + span.set_status(Status(StatusCode.ERROR)) + span.record_exception(err) logger.debug(str(err)) sys.exit(str(err)) except ConfigValidationError as err: + span.set_status(Status(StatusCode.ERROR)) + span.record_exception(err) err_msg = err.cli_message() logger.debug(err_msg) sys.exit(err_msg) except BaseException as err: + span.set_status(Status(StatusCode.ERROR)) + span.record_exception(err) logger.exception(f'ERT crashed unexpectedly with "{err}"') logfiles = set() # Use set to avoid duplicates... @@ -703,6 +716,7 @@ def main() -> None: finally: log_process_usage() os.environ.pop("ERT_LOG_DIR") + ThreadingInstrumentor().uninstrument() if __name__ == "__main__": diff --git a/src/ert/plugins/hook_specifications/__init__.py b/src/ert/plugins/hook_specifications/__init__.py index 0f76b317564..60f946f9dc5 100644 --- a/src/ert/plugins/hook_specifications/__init__.py +++ b/src/ert/plugins/hook_specifications/__init__.py @@ -13,11 +13,15 @@ job_documentation, legacy_ertscript_workflow, ) -from .logging import add_log_handle_to_root +from .logging import ( + add_log_handle_to_root, + add_span_processor, +) from .site_config import site_config_lines __all__ = [ "add_log_handle_to_root", + "add_span_processor", "ecl100_config_path", "ecl300_config_path", "flow_config_path", diff --git a/src/ert/plugins/hook_specifications/logging.py b/src/ert/plugins/hook_specifications/logging.py index 888c875efae..e8f07bf32ff 100644 --- a/src/ert/plugins/hook_specifications/logging.py +++ b/src/ert/plugins/hook_specifications/logging.py @@ -1,5 +1,7 @@ import logging +from opentelemetry.sdk.trace.export import BatchSpanProcessor + from ert.plugins.plugin_manager import hook_specification @@ -11,3 +13,13 @@ def add_log_handle_to_root() -> logging.Handler: # type: ignore :return: A log handle that will be added to the root logger """ + + +@hook_specification +def add_span_processor() -> BatchSpanProcessor: # type: ignore + """ + Create a BatchSpanProcessor which will be added to the trace provider + in ert. + + :return: A BatchSpanProcessor that will be added to the trace provider in ert + """ diff --git a/src/ert/plugins/plugin_manager.py b/src/ert/plugins/plugin_manager.py index 78e341f260e..1744f63d646 100644 --- a/src/ert/plugins/plugin_manager.py +++ b/src/ert/plugins/plugin_manager.py @@ -26,6 +26,7 @@ ) import pluggy +from opentelemetry.sdk.trace import TracerProvider from .workflow_config import WorkflowConfigs @@ -322,17 +323,26 @@ def add_logging_handle_to_root(self, logger: logging.Logger) -> None: for handle in handles: logger.addHandler(handle) + def add_span_processor_to_trace_provider( + self, trace_provider: TracerProvider + ) -> None: + span_processors = self.hook.add_span_processor() + for span_processor in span_processors: + trace_provider.add_span_processor(span_processor) + class ErtPluginContext: def __init__( self, plugins: Optional[List[object]] = None, logger: Optional[logging.Logger] = None, + trace_provider: Optional[TracerProvider] = None, ) -> None: self.plugin_manager = ErtPluginManager(plugins=plugins) self.tmp_dir: Optional[str] = None self.tmp_site_config_filename: Optional[str] = None self._logger = logger + self._trace_provider = trace_provider def _create_site_config(self, tmp_dir: str) -> Optional[str]: site_config_content = self.plugin_manager.get_site_config_content() @@ -348,6 +358,10 @@ def _create_site_config(self, tmp_dir: str) -> Optional[str]: def __enter__(self) -> ErtPluginContext: if self._logger is not None: self.plugin_manager.add_logging_handle_to_root(logger=self._logger) + if self._trace_provider is not None: + self.plugin_manager.add_span_processor_to_trace_provider( + trace_provider=self._trace_provider + ) logger.debug(str(self.plugin_manager)) logger.debug("Creating temporary directory for site-config") self.tmp_dir = tempfile.mkdtemp() diff --git a/src/ert/run_models/base_run_model.py b/src/ert/run_models/base_run_model.py index fb7b0757462..5e1dfed32fc 100644 --- a/src/ert/run_models/base_run_model.py +++ b/src/ert/run_models/base_run_model.py @@ -70,6 +70,7 @@ from ert.mode_definitions import MODULE_MODE from ert.runpaths import Runpaths from ert.storage import Ensemble, Storage +from ert.trace import tracer from ert.workflow_runner import WorkflowRunner from ..config.analysis_config import UpdateSettings @@ -318,6 +319,7 @@ def _clean_env_context(self) -> None: self._context_env.pop(key) os.environ.pop(key, None) + @tracer.start_as_current_span(f"{__name__}.start_simulations_thread") def start_simulations_thread( self, evaluator_server_config: EvaluatorServerConfig, @@ -573,6 +575,7 @@ async def run_ensemble_evaluator_async( return evaluator_task.result() # This function needs to be there for the sake of testing that expects sync ee run + @tracer.start_as_current_span(f"{__name__}.run_ensemble_evaluator") def run_ensemble_evaluator( self, run_args: List[RunArg], @@ -654,6 +657,7 @@ def validate(self) -> None: f"({min_realization_count})" ) + @tracer.start_as_current_span(f"{__name__}.run_workflows") def run_workflows( self, runtime: HookRuntime, storage: Storage, ensemble: Ensemble ) -> None: diff --git a/src/ert/run_models/ensemble_experiment.py b/src/ert/run_models/ensemble_experiment.py index 25348477b8f..13d8a3a8411 100644 --- a/src/ert/run_models/ensemble_experiment.py +++ b/src/ert/run_models/ensemble_experiment.py @@ -9,6 +9,7 @@ from ert.enkf_main import sample_prior from ert.ensemble_evaluator import EvaluatorServerConfig from ert.storage import Ensemble, Experiment, Storage +from ert.trace import tracer from ..run_arg import create_run_arguments from .base_run_model import BaseRunModel, StatusEvents @@ -55,6 +56,7 @@ def __init__( minimum_required_realizations=minimum_required_realizations, ) + @tracer.start_as_current_span(f"{__name__}.run_experiment") def run_experiment( self, evaluator_server_config: EvaluatorServerConfig, diff --git a/src/ert/run_models/ensemble_smoother.py b/src/ert/run_models/ensemble_smoother.py index 547efdfcd3e..b745b2df0a5 100644 --- a/src/ert/run_models/ensemble_smoother.py +++ b/src/ert/run_models/ensemble_smoother.py @@ -10,6 +10,7 @@ from ert.enkf_main import sample_prior from ert.ensemble_evaluator import EvaluatorServerConfig from ert.storage import Storage +from ert.trace import tracer from ..config.analysis_config import UpdateSettings from ..config.analysis_module import ESSettings @@ -56,6 +57,7 @@ def __init__( self.support_restart = False + @tracer.start_as_current_span(f"{__name__}.run_experiment") def run_experiment( self, evaluator_server_config: EvaluatorServerConfig, restart: bool = False ) -> None: diff --git a/src/ert/run_models/evaluate_ensemble.py b/src/ert/run_models/evaluate_ensemble.py index 00737982bc1..aa67f42b07c 100644 --- a/src/ert/run_models/evaluate_ensemble.py +++ b/src/ert/run_models/evaluate_ensemble.py @@ -8,6 +8,7 @@ from ert.ensemble_evaluator import EvaluatorServerConfig from ert.storage import Storage +from ert.trace import tracer from ..run_arg import create_run_arguments from . import BaseRunModel @@ -58,6 +59,7 @@ def __init__( random_seed=random_seed, ) + @tracer.start_as_current_span(f"{__name__}.run_experiment") def run_experiment( self, evaluator_server_config: EvaluatorServerConfig, restart: bool = False ) -> None: diff --git a/src/ert/run_models/iterated_ensemble_smoother.py b/src/ert/run_models/iterated_ensemble_smoother.py index 2a674e062fd..db03d299390 100644 --- a/src/ert/run_models/iterated_ensemble_smoother.py +++ b/src/ert/run_models/iterated_ensemble_smoother.py @@ -13,6 +13,7 @@ from ert.enkf_main import sample_prior from ert.ensemble_evaluator import EvaluatorServerConfig from ert.storage import Ensemble, Storage +from ert.trace import tracer from ..config.analysis_config import UpdateSettings from ..config.analysis_module import IESSettings @@ -117,6 +118,7 @@ def analyzeStep( ) from e self.run_workflows(HookRuntime.POST_UPDATE, self._storage, posterior_storage) + @tracer.start_as_current_span(f"{__name__}.run_experiment") def run_experiment( self, evaluator_server_config: EvaluatorServerConfig, restart: bool = False ) -> None: diff --git a/src/ert/run_models/multiple_data_assimilation.py b/src/ert/run_models/multiple_data_assimilation.py index eb394b852b6..ac30649c6dc 100644 --- a/src/ert/run_models/multiple_data_assimilation.py +++ b/src/ert/run_models/multiple_data_assimilation.py @@ -11,6 +11,7 @@ from ert.enkf_main import sample_prior from ert.ensemble_evaluator import EvaluatorServerConfig from ert.storage import Ensemble, Storage +from ert.trace import tracer from ..config.analysis_config import UpdateSettings from ..config.analysis_module import ESSettings @@ -79,6 +80,7 @@ def __init__( minimum_required_realizations=minimum_required_realizations, ) + @tracer.start_as_current_span(f"{__name__}.run_experiment") def run_experiment( self, evaluator_server_config: EvaluatorServerConfig, restart: bool = False ) -> None: diff --git a/src/ert/scheduler/job.py b/src/ert/scheduler/job.py index 74767aea562..2d817de6efa 100644 --- a/src/ert/scheduler/job.py +++ b/src/ert/scheduler/job.py @@ -17,6 +17,7 @@ from ert.constant_filenames import ERROR_file from ert.load_status import LoadStatus from ert.storage.realization_storage_state import RealizationStorageState +from ert.trace import tracer from .driver import Driver, FailedSubmit @@ -147,30 +148,31 @@ async def run( checksum_lock: asyncio.Lock, max_submit: int = 1, ) -> None: - self._requested_max_submit = max_submit - for attempt in range(max_submit): - await self._submit_and_run_once(sem) - - if self.returncode.cancelled() or self._scheduler._cancelled: - break - - if self.returncode.result() == 0: - if self._scheduler._manifest_queue is not None: - await self._verify_checksum(checksum_lock) - async with forward_model_ok_lock: - await self._handle_finished_forward_model() - break - - if attempt < max_submit - 1: - message = ( - f"Realization {self.iens} failed, " - f"resubmitting for attempt {attempt+2} of {max_submit}" - ) - logger.warning(message) - self.returncode = asyncio.Future() - self.started.clear() - else: - await self._send(JobState.FAILED) + with tracer.start_as_current_span(f"{__name__}.run.realization_{self.iens}"): + self._requested_max_submit = max_submit + for attempt in range(max_submit): + await self._submit_and_run_once(sem) + + if self.returncode.cancelled() or self._scheduler._cancelled: + break + + if self.returncode.result() == 0: + if self._scheduler._manifest_queue is not None: + await self._verify_checksum(checksum_lock) + async with forward_model_ok_lock: + await self._handle_finished_forward_model() + break + + if attempt < max_submit - 1: + message = ( + f"Realization {self.iens} failed, " + f"resubmitting for attempt {attempt+2} of {max_submit}" + ) + logger.warning(message) + self.returncode = asyncio.Future() + self.started.clear() + else: + await self._send(JobState.FAILED) async def _max_runtime_task(self) -> None: assert self.real.max_runtime is not None diff --git a/src/ert/trace.py b/src/ert/trace.py new file mode 100644 index 00000000000..b68477b6022 --- /dev/null +++ b/src/ert/trace.py @@ -0,0 +1,11 @@ +from opentelemetry import trace +from opentelemetry.sdk.resources import SERVICE_NAME, Resource +from opentelemetry.sdk.trace import SpanLimits, TracerProvider + +resource = Resource(attributes={SERVICE_NAME: "ert"}) +tracer_provider = TracerProvider( + resource=resource, span_limits=SpanLimits(max_events=128 * 16) +) +trace.set_tracer_provider(tracer_provider) + +tracer = trace.get_tracer("ert.main") diff --git a/tests/ert/unit_tests/plugins/dummy_plugins.py b/tests/ert/unit_tests/plugins/dummy_plugins.py index 7eda3bcc899..fa1c7536654 100644 --- a/tests/ert/unit_tests/plugins/dummy_plugins.py +++ b/tests/ert/unit_tests/plugins/dummy_plugins.py @@ -1,4 +1,7 @@ import logging +from io import StringIO + +from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter from ert import ForwardModelStepPlugin, plugin @@ -76,6 +79,14 @@ def add_log_handle_to_root(): return fh +span_output = StringIO() + + +@plugin(name="dummy") +def add_span_processor(): + return BatchSpanProcessor(ConsoleSpanExporter(out=span_output)) + + class DummyFMStep(ForwardModelStepPlugin): def __init__(self): super().__init__(name="DummyForwardModel", command=["touch", "dummy.out"]) diff --git a/tests/ert/unit_tests/plugins/test_plugin_manager.py b/tests/ert/unit_tests/plugins/test_plugin_manager.py index 3c000cf33a8..18a9bdcdbd2 100644 --- a/tests/ert/unit_tests/plugins/test_plugin_manager.py +++ b/tests/ert/unit_tests/plugins/test_plugin_manager.py @@ -1,7 +1,10 @@ +import json import logging import tempfile from unittest.mock import Mock +from opentelemetry.sdk.trace import TracerProvider + import ert.plugins.hook_implementations from ert.plugins import ErtPluginManager from tests.ert.unit_tests.plugins import dummy_plugins @@ -115,6 +118,22 @@ def test_add_logging_handle(tmpdir): assert "I should write this to spam.log" in result +def test_add_span_processor(): + pm = ErtPluginManager(plugins=[dummy_plugins]) + tracer_provider = TracerProvider() + tracer = tracer_provider.get_tracer("ert.tests") + pm.add_span_processor_to_trace_provider(tracer_provider) + with tracer.start_as_current_span("span_1"): + print("do_something") + with tracer.start_as_current_span("span_2"): + print("do_something_else") + tracer_provider.force_flush() + span_info = "[" + dummy_plugins.span_output.getvalue().replace("}\n{", "},{") + "]" + span_info = json.loads(span_info) + span_info = {span["name"]: span for span in span_info} + assert span_info["span_2"]["parent_id"] == span_info["span_1"]["context"]["span_id"] + + def test_that_forward_model_step_is_registered(tmpdir): with tmpdir.as_cwd(): pm = ErtPluginManager(plugins=[dummy_plugins])