From ba1c60253311ee1b0f668877c5da12b7508dac6b Mon Sep 17 00:00:00 2001 From: Owen Leung Date: Sat, 21 Sep 2024 15:57:58 +0800 Subject: [PATCH] (feat): Add opensearch logging integration (#41799) * Add feature to read log from opensearch --- .../airflow_local_settings.py | 31 +- airflow/config_templates/unit_tests.cfg | 24 + airflow/providers/opensearch/log/__init__.py | 16 + .../opensearch/log/os_json_formatter.py | 43 ++ .../providers/opensearch/log/os_response.py | 168 ++++++ .../opensearch/log/os_task_handler.py | 567 ++++++++++++++++++ airflow/providers/opensearch/provider.yaml | 143 +++++ airflow/utils/db.py | 10 + .../configurations-ref.rst | 18 + .../index.rst | 2 + .../logging/index.rst | 72 +++ tests/core/test_configuration.py | 2 + tests/providers/opensearch/conftest.py | 120 ++++ tests/providers/opensearch/log/__init__.py | 16 + .../opensearch/log/test_os_json_formatter.py | 92 +++ .../opensearch/log/test_os_response.py | 134 +++++ .../opensearch/log/test_os_task_handler.py | 499 +++++++++++++++ 17 files changed, 1956 insertions(+), 1 deletion(-) create mode 100644 airflow/providers/opensearch/log/__init__.py create mode 100644 airflow/providers/opensearch/log/os_json_formatter.py create mode 100644 airflow/providers/opensearch/log/os_response.py create mode 100644 airflow/providers/opensearch/log/os_task_handler.py create mode 100644 docs/apache-airflow-providers-opensearch/configurations-ref.rst create mode 100644 docs/apache-airflow-providers-opensearch/logging/index.rst create mode 100644 tests/providers/opensearch/log/__init__.py create mode 100644 tests/providers/opensearch/log/test_os_json_formatter.py create mode 100644 tests/providers/opensearch/log/test_os_response.py create mode 100644 tests/providers/opensearch/log/test_os_task_handler.py diff --git a/airflow/config_templates/airflow_local_settings.py b/airflow/config_templates/airflow_local_settings.py index 2ce5c8378229..cee5b428f6db 100644 --- a/airflow/config_templates/airflow_local_settings.py +++ b/airflow/config_templates/airflow_local_settings.py @@ -208,7 +208,7 @@ if REMOTE_LOGGING: ELASTICSEARCH_HOST: str | None = conf.get("elasticsearch", "HOST") - + OPENSEARCH_HOST: str | None = conf.get("opensearch", "HOST") # Storage bucket URL for remote logging # S3 buckets should start with "s3://" # Cloudwatch log groups should start with "cloudwatch://" @@ -330,6 +330,35 @@ } DEFAULT_LOGGING_CONFIG["handlers"].update(ELASTIC_REMOTE_HANDLERS) + elif OPENSEARCH_HOST: + OPENSEARCH_END_OF_LOG_MARK: str = conf.get_mandatory_value("opensearch", "END_OF_LOG_MARK") + OPENSEARCH_PORT: str = conf.get_mandatory_value("opensearch", "PORT") + OPENSEARCH_USERNAME: str = conf.get_mandatory_value("opensearch", "USERNAME") + OPENSEARCH_PASSWORD: str = conf.get_mandatory_value("opensearch", "PASSWORD") + OPENSEARCH_WRITE_STDOUT: bool = conf.getboolean("opensearch", "WRITE_STDOUT") + OPENSEARCH_JSON_FORMAT: bool = conf.getboolean("opensearch", "JSON_FORMAT") + OPENSEARCH_JSON_FIELDS: str = conf.get_mandatory_value("opensearch", "JSON_FIELDS") + OPENSEARCH_HOST_FIELD: str = conf.get_mandatory_value("opensearch", "HOST_FIELD") + OPENSEARCH_OFFSET_FIELD: str = conf.get_mandatory_value("opensearch", "OFFSET_FIELD") + + OPENSEARCH_REMOTE_HANDLERS: dict[str, dict[str, str | bool | None]] = { + "task": { + "class": "airflow.providers.opensearch.log.os_task_handler.OpensearchTaskHandler", + "formatter": "airflow", + "base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)), + "end_of_log_mark": OPENSEARCH_END_OF_LOG_MARK, + "host": OPENSEARCH_HOST, + "port": OPENSEARCH_PORT, + "username": OPENSEARCH_USERNAME, + "password": OPENSEARCH_PASSWORD, + "write_stdout": OPENSEARCH_WRITE_STDOUT, + "json_format": OPENSEARCH_JSON_FORMAT, + "json_fields": OPENSEARCH_JSON_FIELDS, + "host_field": OPENSEARCH_HOST_FIELD, + "offset_field": OPENSEARCH_OFFSET_FIELD, + }, + } + DEFAULT_LOGGING_CONFIG["handlers"].update(OPENSEARCH_REMOTE_HANDLERS) else: raise AirflowException( "Incorrect remote log configuration. Please check the configuration of option 'host' in " diff --git a/airflow/config_templates/unit_tests.cfg b/airflow/config_templates/unit_tests.cfg index af32b79f4b0a..27134c721821 100644 --- a/airflow/config_templates/unit_tests.cfg +++ b/airflow/config_templates/unit_tests.cfg @@ -97,6 +97,30 @@ dag_dir_list_interval = 0 # by users. job_heartbeat_sec = 2 +[opensearch] +host = +port = +username = +password = +log_id_template = +end_of_log_mark = end_of_log +write_stdout = False +json_format = False +json_fields = asctime, filename, lineno, levelname, message +host_field = host +offset_field = offset +index_patterns = _all +index_patterns_callable = + +[opensearch_configs] +http_compress = False +use_ssl = False +verify_certs = False +ssl_assert_hostname = False +ssl_show_warn = False +ca_certs = + + [example_section] # This section is used to test coercions of configuration values retrieval string_value = 21600 diff --git a/airflow/providers/opensearch/log/__init__.py b/airflow/providers/opensearch/log/__init__.py new file mode 100644 index 000000000000..13a83393a912 --- /dev/null +++ b/airflow/providers/opensearch/log/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/airflow/providers/opensearch/log/os_json_formatter.py b/airflow/providers/opensearch/log/os_json_formatter.py new file mode 100644 index 000000000000..e41530ac6604 --- /dev/null +++ b/airflow/providers/opensearch/log/os_json_formatter.py @@ -0,0 +1,43 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from datetime import datetime + +import pendulum + +from airflow.utils.log.json_formatter import JSONFormatter + + +class OpensearchJSONFormatter(JSONFormatter): + """Convert a log record to JSON with ISO 8601 date and time format.""" + + default_time_format = "%Y-%m-%dT%H:%M:%S" + default_msec_format = "%s.%03d" + default_tz_format = "%z" + + def formatTime(self, record, datefmt=None): + """Return the creation time of the LogRecord in ISO 8601 date/time format in the local time zone.""" + # TODO: Use airflow.utils.timezone.from_timestamp(record.created, tz="local") + # as soon as min Airflow 2.9.0 + dt = datetime.fromtimestamp(record.created, tz=pendulum.local_timezone()) + s = dt.strftime(datefmt or self.default_time_format) + if self.default_msec_format: + s = self.default_msec_format % (s, record.msecs) + if self.default_tz_format: + s += dt.strftime(self.default_tz_format) + return s diff --git a/airflow/providers/opensearch/log/os_response.py b/airflow/providers/opensearch/log/os_response.py new file mode 100644 index 000000000000..de2635b4227d --- /dev/null +++ b/airflow/providers/opensearch/log/os_response.py @@ -0,0 +1,168 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from typing import Iterator + + +def _wrap(val): + if isinstance(val, dict): + return AttributeDict(val) + return val + + +class AttributeList: + """Helper class to provide attribute like access to List objects.""" + + def __init__(self, _list): + if not isinstance(_list, list): + _list = list(_list) + self._l_ = _list + + def __getitem__(self, k): + """Retrieve an item or a slice from the list. If the item is a dictionary, it is wrapped in an AttributeDict.""" + val = self._l_[k] + if isinstance(val, slice): + return AttributeList(val) + return _wrap(val) + + def __iter__(self): + """Provide an iterator for the list or the dictionary.""" + return (_wrap(i) for i in self._l_) + + def __bool__(self): + """Check if the list is non-empty.""" + return bool(self._l_) + + +class AttributeDict: + """Helper class to provide attribute like access to Dictionary objects.""" + + def __init__(self, d): + super().__setattr__("_d_", d) + + def __getattr__(self, attr_name): + """Retrieve an item as an attribute from the dictionary.""" + try: + return self.__getitem__(attr_name) + except KeyError: + raise AttributeError(f"{self.__class__.__name__!r} object has no attribute {attr_name!r}") + + def __getitem__(self, key): + """Retrieve an item using a key from the dictionary.""" + return _wrap(self._d_[key]) + + def to_dict(self): + return self._d_ + + +class Hit(AttributeDict): + """ + The Hit class is used to manage and access elements in a document. + + It inherits from the AttributeDict class and provides + attribute-like access to its elements, similar to a dictionary. + """ + + def __init__(self, document): + data = {} + if "_source" in document: + data = document["_source"] + if "fields" in document: + data.update(document["fields"]) + + super().__init__(data) + super().__setattr__("meta", HitMeta(document)) + + +class HitMeta(AttributeDict): + """ + The HitMeta class is used to manage and access metadata of a document. + + This class inherits from the AttributeDict class and provides + attribute-like access to its elements. + """ + + def __init__(self, document, exclude=("_source", "_fields")): + d = {k[1:] if k.startswith("_") else k: v for (k, v) in document.items() if k not in exclude} + if "type" in d: + # make sure we are consistent everywhere in python + d["doc_type"] = d.pop("type") + super().__init__(d) + + +class OpensearchResponse(AttributeDict): + """ + The OpensearchResponse class is used to manage and access the response from an Opensearch search. + + This class can be iterated over directly to access hits in the response. Indexing the class instance + with an integer or slice will also access the hits. The class also evaluates to True + if there are any hits in the response. + + The hits property returns an AttributeList of hits in the response, with each hit transformed into + an instance of the doc_class if provided. + + The response parameter stores the dictionary returned by the Elasticsearch client search method. + """ + + def __init__(self, search, response, doc_class=None): + super().__setattr__("_search", search) + super().__setattr__("_doc_class", doc_class) + super().__init__(response) + + def __iter__(self) -> Iterator[Hit]: + """Provide an iterator over the hits in the Elasticsearch response.""" + return iter(self.hits) + + def __getitem__(self, key): + """Retrieve a specific hit or a slice of hits from the Elasticsearch response.""" + if isinstance(key, (slice, int)): + return self.hits[key] + return super().__getitem__(key) + + def __bool__(self): + """Evaluate the presence of hits in the Elasticsearch response.""" + return bool(self.hits) + + @property + def hits(self) -> list[Hit]: + """ + This property provides access to the hits (i.e., the results) of the Opensearch response. + + The hits are represented as an `AttributeList` of `Hit` instances, which allow for easy, + attribute-like access to the hit data. + + The hits are lazily loaded, meaning they're not processed until this property is accessed. + Upon first access, the hits data from the response is processed using the `_get_result` method + of the associated `Search` instance (i.e. an instance from ElasticsearchTaskHandler class), + and the results are stored for future accesses. + + Each hit also includes all the additional data present in the "hits" field of the response, + accessible as attributes of the hit. + """ + if not hasattr(self, "_hits"): + h = self._d_["hits"] + + try: + hits = AttributeList(map(self._search._get_result, h["hits"])) + except AttributeError as e: + raise TypeError("Could not parse hits.", e) + + super().__setattr__("_hits", hits) + for k in h: + setattr(self._hits, k, _wrap(h[k])) + return self._hits diff --git a/airflow/providers/opensearch/log/os_task_handler.py b/airflow/providers/opensearch/log/os_task_handler.py new file mode 100644 index 000000000000..99f9d416988f --- /dev/null +++ b/airflow/providers/opensearch/log/os_task_handler.py @@ -0,0 +1,567 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +import contextlib +import logging +import sys +import time +from collections import defaultdict +from datetime import datetime +from operator import attrgetter +from typing import TYPE_CHECKING, Any, Callable, List, Literal, Tuple + +import pendulum +from opensearchpy import OpenSearch +from opensearchpy.exceptions import NotFoundError + +from airflow.configuration import conf +from airflow.exceptions import AirflowException +from airflow.models import DagRun +from airflow.providers.opensearch.log.os_json_formatter import OpensearchJSONFormatter +from airflow.providers.opensearch.log.os_response import Hit, OpensearchResponse +from airflow.utils import timezone +from airflow.utils.log.file_task_handler import FileTaskHandler +from airflow.utils.log.logging_mixin import ExternalLoggingMixin, LoggingMixin +from airflow.utils.module_loading import import_string +from airflow.utils.session import create_session + +if TYPE_CHECKING: + from airflow.models.taskinstance import TaskInstance, TaskInstanceKey +USE_PER_RUN_LOG_ID = hasattr(DagRun, "get_log_template") +OsLogMsgType = List[Tuple[str, str]] +LOG_LINE_DEFAULTS = {"exc_text": "", "stack_info": ""} + + +def getattr_nested(obj, item, default): + """ + Get item from obj but return default if not found. + + E.g. calling ``getattr_nested(a, 'b.c', "NA")`` will return + ``a.b.c`` if such a value exists, and "NA" otherwise. + + :meta private: + """ + try: + return attrgetter(item)(obj) + except AttributeError: + return default + + +def _ensure_ti(ti: TaskInstanceKey | TaskInstance, session) -> TaskInstance: + """ + Given TI | TIKey, return a TI object. + + Will raise exception if no TI is found in the database. + """ + from airflow.models.taskinstance import TaskInstance, TaskInstanceKey + + if not isinstance(ti, TaskInstanceKey): + return ti + val = ( + session.query(TaskInstance) + .filter( + TaskInstance.task_id == ti.task_id, + TaskInstance.dag_id == ti.dag_id, + TaskInstance.run_id == ti.run_id, + TaskInstance.map_index == ti.map_index, + ) + .one_or_none() + ) + if isinstance(val, TaskInstance): + val.try_number = ti.try_number + return val + else: + raise AirflowException(f"Could not find TaskInstance for {ti}") + + +def get_os_kwargs_from_config() -> dict[str, Any]: + open_search_config = conf.getsection("opensearch_configs") + kwargs_dict = {key: value for key, value in open_search_config.items()} if open_search_config else {} + + return kwargs_dict + + +class OpensearchTaskHandler(FileTaskHandler, ExternalLoggingMixin, LoggingMixin): + """ + OpensearchTaskHandler is a Python log handler that reads and writes logs to OpenSearch. + + Like the ElasticsearchTaskHandler, Airflow itself does not handle the indexing of logs. + Instead, logs are flushed to local files, and additional software (e.g., Filebeat, Logstash) + may be required to ship logs to OpenSearch. This handler then enables fetching and displaying + logs from OpenSearch. + + To efficiently query and sort Elasticsearch results, this handler assumes each + log message has a field `log_id` consists of ti primary keys: + `log_id = {dag_id}-{task_id}-{execution_date}-{try_number}` + Log messages with specific log_id are sorted based on `offset`, + which is a unique integer indicates log message's order. + Timestamps here are unreliable because multiple log messages + might have the same timestamp. + + :param base_log_folder: Base folder to store logs locally. + :param end_of_log_mark: A marker string to signify the end of logs. + :param write_stdout: Whether to also write logs to stdout. + :param json_format: Whether to format logs as JSON. + :param json_fields: Comma-separated list of fields to include in the JSON log output. + :param host: OpenSearch host name. + :param port: OpenSearch port. + :param username: Username for OpenSearch authentication. + :param password: Password for OpenSearch authentication. + :param host_field: The field name for the host in the logs (default is "host"). + :param offset_field: The field name for the log offset (default is "offset"). + :param index_patterns: Index pattern or template for storing logs. + :param index_patterns_callable: Callable that dynamically generates index patterns based on context. + :param os_kwargs: Additional OpenSearch client options. This can be set to "default_os_kwargs" to + load the default configuration from Airflow's settings. + """ + + PAGE = 0 + MAX_LINE_PER_PAGE = 1000 + LOG_NAME = "Opensearch" + + trigger_should_wrap = True + + def __init__( + self, + base_log_folder: str, + end_of_log_mark: str, + write_stdout: bool, + json_format: bool, + json_fields: str, + host: str, + port: int, + username: str, + password: str, + host_field: str = "host", + offset_field: str = "offset", + index_patterns: str = conf.get("opensearch", "index_patterns", fallback="_all"), + index_patterns_callable: str = conf.get("opensearch", "index_patterns_callable", fallback=""), + os_kwargs: dict | None | Literal["default_os_kwargs"] = "default_os_kwargs", + ): + os_kwargs = os_kwargs or {} + if os_kwargs == "default_os_kwargs": + os_kwargs = get_os_kwargs_from_config() + super().__init__(base_log_folder) + self.closed = False + self.mark_end_on_close = True + self.end_of_log_mark = end_of_log_mark.strip() + self.write_stdout = write_stdout + self.json_format = json_format + self.json_fields = [label.strip() for label in json_fields.split(",")] + self.host_field = host_field + self.offset_field = offset_field + self.index_patterns = index_patterns + self.index_patterns_callable = index_patterns_callable + self.context_set = False + self.client = OpenSearch( + hosts=[{"host": host, "port": port}], + http_auth=(username, password), + **os_kwargs, + ) + # client = OpenSearch(hosts=[{"host": host, "port": port}], http_auth=(username, password), use_ssl=True, verify_certs=True, ca_cert="/opt/airflow/root-ca.pem", ssl_assert_hostname = False, ssl_show_warn = False) + self.formatter: logging.Formatter + self.handler: logging.FileHandler | logging.StreamHandler # type: ignore[assignment] + self._doc_type_map: dict[Any, Any] = {} + self._doc_type: list[Any] = [] + + def set_context(self, ti: TaskInstance, *, identifier: str | None = None) -> None: + """ + Provide task_instance context to airflow task handler. + + :param ti: task instance object + :param identifier: if set, identifies the Airflow component which is relaying logs from + exceptional scenarios related to the task instance + """ + is_trigger_log_context = getattr(ti, "is_trigger_log_context", None) + is_ti_raw = getattr(ti, "raw", None) + self.mark_end_on_close = not is_ti_raw and not is_trigger_log_context + + if self.json_format: + self.formatter = OpensearchJSONFormatter( + fmt=self.formatter._fmt, + json_fields=[*self.json_fields, self.offset_field], + extras={ + "dag_id": str(ti.dag_id), + "task_id": str(ti.task_id), + "execution_date": self._clean_date(ti.execution_date), + "try_number": str(ti.try_number), + "log_id": self._render_log_id(ti, ti.try_number), + }, + ) + + if self.write_stdout: + if self.context_set: + # We don't want to re-set up the handler if this logger has + # already been initialized + return + + self.handler = logging.StreamHandler(stream=sys.__stdout__) + self.handler.setLevel(self.level) + self.handler.setFormatter(self.formatter) + else: + # todo: remove-at-min-airflow-version-2.8 + # after Airflow 2.8 can always pass `identifier` + if getattr(super(), "supports_task_context_logging", False): + super().set_context(ti, identifier=identifier) + else: + super().set_context(ti) + self.context_set = True + + def emit(self, record): + if self.handler: + setattr(record, self.offset_field, int(time.time() * (10**9))) + self.handler.emit(record) + + def close(self) -> None: + # When application exit, system shuts down all handlers by + # calling close method. Here we check if logger is already + # closed to prevent uploading the log to remote storage multiple + # times when `logging.shutdown` is called. + if self.closed: + return + + if not self.mark_end_on_close: + # when we're closing due to task deferral, don't mark end of log + self.closed = True + return + + # Case which context of the handler was not set. + if self.handler is None: + self.closed = True + return + + # Reopen the file stream, because FileHandler.close() would be called + # first in logging.shutdown() and the stream in it would be set to None. + if self.handler.stream is None or self.handler.stream.closed: # type: ignore[attr-defined] + self.handler.stream = self.handler._open() # type: ignore[union-attr] + + # Mark the end of file using end of log mark, + # so we know where to stop while auto-tailing. + self.emit(logging.makeLogRecord({"msg": self.end_of_log_mark})) + + if self.write_stdout: + self.handler.close() + sys.stdout = sys.__stdout__ + + super().close() + + self.closed = True + + def _read_grouped_logs(self): + return True + + @staticmethod + def _clean_date(value: datetime | None) -> str: + """ + Clean up a date value so that it is safe to query in elasticsearch by removing reserved characters. + + https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-query-string-query.html#_reserved_characters + """ + if value is None: + return "" + return value.strftime("%Y_%m_%dT%H_%M_%S_%f") + + def _render_log_id(self, ti: TaskInstance | TaskInstanceKey, try_number: int) -> str: + from airflow.models.taskinstance import TaskInstanceKey + + with create_session() as session: + if isinstance(ti, TaskInstanceKey): + ti = _ensure_ti(ti, session) + dag_run = ti.get_dagrun(session=session) + if USE_PER_RUN_LOG_ID: + log_id_template = dag_run.get_log_template(session=session).elasticsearch_id + + if TYPE_CHECKING: + assert ti.task + try: + dag = ti.task.dag + except AttributeError: # ti.task is not always set. + data_interval = (dag_run.data_interval_start, dag_run.data_interval_end) + else: + if TYPE_CHECKING: + assert dag is not None + data_interval = dag.get_run_data_interval(dag_run) + + if self.json_format: + data_interval_start = self._clean_date(data_interval[0]) + data_interval_end = self._clean_date(data_interval[1]) + execution_date = self._clean_date(dag_run.execution_date) + else: + if data_interval[0]: + data_interval_start = data_interval[0].isoformat() + else: + data_interval_start = "" + if data_interval[1]: + data_interval_end = data_interval[1].isoformat() + else: + data_interval_end = "" + execution_date = dag_run.execution_date.isoformat() + + return log_id_template.format( + dag_id=ti.dag_id, + task_id=ti.task_id, + run_id=getattr(ti, "run_id", ""), + data_interval_start=data_interval_start, + data_interval_end=data_interval_end, + execution_date=execution_date, + try_number=try_number, + map_index=getattr(ti, "map_index", ""), + ) + + def _read( + self, ti: TaskInstance, try_number: int, metadata: dict | None = None + ) -> tuple[OsLogMsgType, dict]: + """ + Endpoint for streaming log. + + :param ti: task instance object + :param try_number: try_number of the task instance + :param metadata: log metadata, + can be used for steaming log reading and auto-tailing. + :return: a list of tuple with host and log documents, metadata. + """ + if not metadata: + metadata = {"offset": 0} + if "offset" not in metadata: + metadata["offset"] = 0 + + offset = metadata["offset"] + log_id = self._render_log_id(ti, try_number) + response = self._os_read(log_id, offset, ti) + if response is not None and response.hits: + logs_by_host = self._group_logs_by_host(response) + next_offset = attrgetter(self.offset_field)(response[-1]) + else: + logs_by_host = None + next_offset = offset + + # Ensure a string here. Large offset numbers will get JSON.parsed incorrectly + # on the client. Sending as a string prevents this issue. + # https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Number/MAX_SAFE_INTEGER + metadata["offset"] = str(next_offset) + + # end_of_log_mark may contain characters like '\n' which is needed to + # have the log uploaded but will not be stored in elasticsearch. + metadata["end_of_log"] = False + if logs_by_host: + if any(x[-1].message == self.end_of_log_mark for x in logs_by_host.values()): + metadata["end_of_log"] = True + + cur_ts = pendulum.now() + if "last_log_timestamp" in metadata: + last_log_ts = timezone.parse(metadata["last_log_timestamp"]) + + # if we are not getting any logs at all after more than N seconds of trying, + # assume logs do not exist + if int(next_offset) == 0 and cur_ts.diff(last_log_ts).in_seconds() > 5: + metadata["end_of_log"] = True + missing_log_message = ( + f"*** Log {log_id} not found in Opensearch. " + "If your task started recently, please wait a moment and reload this page. " + "Otherwise, the logs for this task instance may have been removed." + ) + return [("", missing_log_message)], metadata + if ( + # Assume end of log after not receiving new log for N min, + cur_ts.diff(last_log_ts).in_minutes() >= 5 + # if max_offset specified, respect it + or ("max_offset" in metadata and int(offset) >= int(metadata["max_offset"])) + ): + metadata["end_of_log"] = True + + if int(offset) != int(next_offset) or "last_log_timestamp" not in metadata: + metadata["last_log_timestamp"] = str(cur_ts) + + # If we hit the end of the log, remove the actual end_of_log message + # to prevent it from showing in the UI. + def concat_logs(hits: list[Hit]): + log_range = (len(hits) - 1) if hits[-1].message == self.end_of_log_mark else len(hits) + return "\n".join(self._format_msg(hits[i]) for i in range(log_range)) + + if logs_by_host: + message = [(host, concat_logs(hits)) for host, hits in logs_by_host.items()] + else: + message = [] + return message, metadata + + def _os_read(self, log_id: str, offset: int | str, ti: TaskInstance) -> OpensearchResponse | None: + """ + Return the logs matching log_id in Elasticsearch and next offset or ''. + + :param log_id: the log_id of the log to read. + :param offset: the offset start to read log from. + :param ti: the task instance object + + :meta private: + """ + query: dict[Any, Any] = { + "query": { + "bool": { + "filter": [{"range": {self.offset_field: {"gt": int(offset)}}}], + "must": [{"match_phrase": {"log_id": log_id}}], + } + } + } + index_patterns = self._get_index_patterns(ti) + try: + max_log_line = self.client.count(index=index_patterns, body=query)["count"] # type: ignore + except NotFoundError as e: + self.log.exception("The target index pattern %s does not exist", index_patterns) + raise e + + if max_log_line != 0: + try: + res = self.client.search( + index=index_patterns, + body=query, + sort=[self.offset_field], + size=self.MAX_LINE_PER_PAGE, + from_=self.MAX_LINE_PER_PAGE * self.PAGE, + ) + return OpensearchResponse(self, res) + except Exception as err: + self.log.exception("Could not read log with log_id: %s. Exception: %s", log_id, err) + + return None + + def _get_index_patterns(self, ti: TaskInstance | None) -> str: + """ + Get index patterns by calling index_patterns_callable, if provided, or the configured index_patterns. + + :param ti: A TaskInstance object or None. + """ + if self.index_patterns_callable: + self.log.debug("Using index_patterns_callable: %s", self.index_patterns_callable) + index_pattern_callable_obj = import_string(self.index_patterns_callable) + return index_pattern_callable_obj(ti) + self.log.debug("Using index_patterns: %s", self.index_patterns) + return self.index_patterns + + def _get_result(self, hit: dict[Any, Any], parent_class=None) -> Hit: + """ + Process a hit (i.e., a result) from an Elasticsearch response and transform it into a class instance. + + The transformation depends on the contents of the hit. If the document in hit contains a nested field, + the '_resolve_nested' method is used to determine the appropriate class (based on the nested path). + If the hit has a document type that is present in the '_doc_type_map', the corresponding class is + used. If not, the method iterates over the '_doc_type' classes and uses the first one whose '_matches' + method returns True for the hit. + + If the hit contains any 'inner_hits', these are also processed into 'ElasticSearchResponse' instances + using the determined class. + + Finally, the transformed hit is returned. If the determined class has a 'from_es' method, this is + used to transform the hit + + An example of the hit argument: + + {'_id': 'jdeZT4kBjAZqZnexVUxk', + '_index': '.ds-filebeat-8.8.2-2023.07.09-000001', + '_score': 2.482621, + '_source': {'@timestamp': '2023-07-13T14:13:15.140Z', + 'asctime': '2023-07-09T07:47:43.907+0000', + 'container': {'id': 'airflow'}, + 'dag_id': 'example_bash_operator', + 'ecs': {'version': '8.0.0'}, + 'execution_date': '2023_07_09T07_47_32_000000', + 'filename': 'taskinstance.py', + 'input': {'type': 'log'}, + 'levelname': 'INFO', + 'lineno': 1144, + 'log': {'file': {'path': "/opt/airflow/Documents/GitHub/airflow/logs/ + dag_id=example_bash_operator'/run_id=owen_run_run/ + task_id=run_after_loop/attempt=1.log"}, + 'offset': 0}, + 'log.offset': 1688888863907337472, + 'log_id': 'example_bash_operator-run_after_loop-owen_run_run--1-1', + 'message': 'Dependencies all met for dep_context=non-requeueable ' + 'deps ti=', + 'task_id': 'run_after_loop', + 'try_number': '1'}, + '_type': '_doc'} + """ + doc_class = Hit + dt = hit.get("_type") + + if "_nested" in hit: + doc_class = self._resolve_nested(hit, parent_class) + + elif dt in self._doc_type_map: + doc_class = self._doc_type_map[dt] + + else: + for doc_type in self._doc_type: + if hasattr(doc_type, "_matches") and doc_type._matches(hit): + doc_class = doc_type + break + + for t in hit.get("inner_hits", ()): + hit["inner_hits"][t] = OpensearchResponse(self, hit["inner_hits"][t], doc_class=doc_class) + + # callback should get the Hit class if "from_es" is not defined + callback: type[Hit] | Callable[..., Any] = getattr(doc_class, "from_es", doc_class) + return callback(hit) + + def _resolve_nested(self, hit: dict[Any, Any], parent_class=None) -> type[Hit]: + """ + Resolve nested hits from Elasticsearch by iteratively navigating the `_nested` field. + + The result is used to fetch the appropriate document class to handle the hit. + + This method can be used with nested Elasticsearch fields which are structured + as dictionaries with "field" and "_nested" keys. + """ + doc_class = Hit + + nested_path: list[str] = [] + nesting = hit["_nested"] + while nesting and "field" in nesting: + nested_path.append(nesting["field"]) + nesting = nesting.get("_nested") + nested_path_str = ".".join(nested_path) + + if hasattr(parent_class, "_index"): + nested_field = parent_class._index.resolve_field(nested_path_str) + + if nested_field is not None: + return nested_field._doc_class + + return doc_class + + def _group_logs_by_host(self, response: OpensearchResponse) -> dict[str, list[Hit]]: + grouped_logs = defaultdict(list) + for hit in response: + key = getattr_nested(hit, self.host_field, None) or "default_host" + grouped_logs[key].append(hit) + return grouped_logs + + def _format_msg(self, hit: Hit): + """Format ES Record to match settings.LOG_FORMAT when used with json_format.""" + # Using formatter._style.format makes it future proof i.e. + # if we change the formatter style from '%' to '{' or '$', this will still work + if self.json_format: + with contextlib.suppress(Exception): + return self.formatter._style.format( + logging.makeLogRecord({**LOG_LINE_DEFAULTS, **hit.to_dict()}) + ) + + # Just a safe-guard to preserve backwards-compatibility + return hit.message diff --git a/airflow/providers/opensearch/provider.yaml b/airflow/providers/opensearch/provider.yaml index 3902727c8a62..9cfea5054091 100644 --- a/airflow/providers/opensearch/provider.yaml +++ b/airflow/providers/opensearch/provider.yaml @@ -59,3 +59,146 @@ operators: connection-types: - hook-class-name: airflow.providers.opensearch.hooks.opensearch.OpenSearchHook connection-type: opensearch + +logging: + - airflow.providers.opensearch.log.os_task_handler.OpensearchTaskHandler + +config: + opensearch: + description: ~ + options: + host: + description: | + Opensearch host + version_added: 1.5.0 + type: string + example: ~ + default: "" + port: + description: | + The port number of Opensearch host + version_added: 1.5.0 + type: integer + example: ~ + default: "" + username: + description: | + The username for connecting to Opensearch + version_added: 1.5.0 + type: string + sensitive: true + example: ~ + default: "" + password: + description: | + The password for connecting to Opensearch + version_added: 1.5.0 + type: string + sensitive: true + example: ~ + default: "" + log_id_template: + description: | + Format of the log_id, which is used to query for a given tasks logs + version_added: 1.5.0 + type: string + example: ~ + is_template: true + default: "{dag_id}-{task_id}-{run_id}-{map_index}-{try_number}" + end_of_log_mark: + description: | + Used to mark the end of a log stream for a task + version_added: 1.5.0 + type: string + example: ~ + default: "end_of_log" + write_stdout: + description: | + Write the task logs to the stdout of the worker, rather than the default files + version_added: 1.5.0 + type: string + example: ~ + default: "False" + json_format: + description: | + Instead of the default log formatter, write the log lines as JSON + version_added: 1.5.0 + type: string + example: ~ + default: "False" + json_fields: + description: | + Log fields to also attach to the json output, if enabled + version_added: 1.5.0 + type: string + example: ~ + default: "asctime, filename, lineno, levelname, message" + host_field: + description: | + The field where host name is stored (normally either `host` or `host.name`) + version_added: 1.5.0 + type: string + example: ~ + default: "host" + offset_field: + description: | + The field where offset is stored (normally either `offset` or `log.offset`) + version_added: 1.5.0 + type: string + example: ~ + default: "offset" + index_patterns: + description: | + Comma separated list of index patterns to use when searching for logs (default: `_all`). + The index_patterns_callable takes precedence over this. + version_added: 1.5.0 + type: string + example: something-* + default: "_all" + index_patterns_callable: + description: | + A string representing the full path to the Python callable path which accept TI object and + return comma separated list of index patterns. This will takes precedence over index_patterns. + version_added: 1.5.0 + type: string + example: module.callable + default: "" + opensearch_configs: + description: ~ + options: + http_compress: + description: ~ + version_added: 1.5.0 + type: string + example: ~ + default: "False" + use_ssl: + description: ~ + version_added: 1.5.0 + type: string + example: ~ + default: "False" + verify_certs: + description: ~ + version_added: 1.5.0 + type: string + example: ~ + default: "False" + ssl_assert_hostname: + description: ~ + version_added: 1.5.0 + type: string + example: ~ + default: "False" + ssl_show_warn: + description: ~ + version_added: 1.5.0 + type: string + example: ~ + default: "False" + ca_certs: + description: ~ + version_added: 1.5.0 + type: string + example: ~ + default: "" diff --git a/airflow/utils/db.py b/airflow/utils/db.py index b11e744198dc..512195c3aa96 100644 --- a/airflow/utils/db.py +++ b/airflow/utils/db.py @@ -474,6 +474,16 @@ def create_default_connections(session: Session = NEW_SESSION): ), session, ) + merge_conn( + Connection( + conn_id="opensearch_default", + conn_type="opensearch", + host="localhost", + schema="http", + port=9200, + ), + session, + ) merge_conn( Connection( conn_id="opsgenie_default", diff --git a/docs/apache-airflow-providers-opensearch/configurations-ref.rst b/docs/apache-airflow-providers-opensearch/configurations-ref.rst new file mode 100644 index 000000000000..5885c9d91b6e --- /dev/null +++ b/docs/apache-airflow-providers-opensearch/configurations-ref.rst @@ -0,0 +1,18 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +.. include:: ../exts/includes/providers-configurations-ref.rst diff --git a/docs/apache-airflow-providers-opensearch/index.rst b/docs/apache-airflow-providers-opensearch/index.rst index ae911adbb323..d15064228892 100644 --- a/docs/apache-airflow-providers-opensearch/index.rst +++ b/docs/apache-airflow-providers-opensearch/index.rst @@ -35,6 +35,7 @@ :caption: Guides Connection types + Logging for Tasks Operators .. toctree:: @@ -42,6 +43,7 @@ :maxdepth: 1 :caption: References + Configuration Python API <_api/airflow/providers/opensearch/index> .. toctree:: diff --git a/docs/apache-airflow-providers-opensearch/logging/index.rst b/docs/apache-airflow-providers-opensearch/logging/index.rst new file mode 100644 index 000000000000..9e27d8bd66a5 --- /dev/null +++ b/docs/apache-airflow-providers-opensearch/logging/index.rst @@ -0,0 +1,72 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +.. _write-logs-opensearch: + +Writing logs to Opensearch +----------------------------- + +*Added in provider version 1.5.0* +Available only with Airflow>=3.0 + +Airflow can be configured to read task logs from Opensearch and optionally write logs to stdout in standard or json format. These logs can later be collected and forwarded to the cluster using tools like fluentd, logstash or others. + +You can choose to have all task logs from workers output to the highest parent level process, instead of the standard file locations. This allows for some additional flexibility in container environments like Kubernetes, where container stdout is already being logged to the host nodes. From there a log shipping tool can be used to forward them along to Opensearch. To use this feature, set the ``write_stdout`` option in ``airflow.cfg``. +You can also choose to have the logs output in a JSON format, using the ``json_format`` option. Airflow uses the standard Python logging module and JSON fields are directly extracted from the LogRecord object. To use this feature, set the ``json_fields`` option in ``airflow.cfg``. Add the fields to the comma-delimited string that you want collected for the logs. These fields are from the LogRecord object in the ``logging`` module. `Documentation on different attributes can be found here `_. + +First, to use the handler, ``airflow.cfg`` must be configured as follows: + +.. code-block:: ini + + [logging] + remote_logging = True + + [opensearch] + host = + port = + username = + password = + +To output task logs to stdout in JSON format, the following config could be used: + +.. code-block:: ini + + [logging] + remote_logging = True + + [opensearch] + write_stdout = True + json_format = True + +.. _write-logs-elasticsearch-tls: + +Writing logs to Opensearch over TLS +'''''''''''''''''''''''''''''''''''''' + +To add custom configurations to Opensearch (e.g. turning on ``ssl_verify``, adding a custom self-signed +cert, etc.) use the ``opensearch_configs`` setting in your ``airflow.cfg`` + +.. code-block:: ini + + [logging] + remote_logging = True + + [opensearch_configs] + use_ssl = True + verify_certs = True + ssl_assert_hostname = True + ca_certs=/path/to/CA_certs diff --git a/tests/core/test_configuration.py b/tests/core/test_configuration.py index db057c43ca52..1590fb7ee231 100644 --- a/tests/core/test_configuration.py +++ b/tests/core/test_configuration.py @@ -1609,6 +1609,8 @@ def test_sensitive_values(): ("celery", "broker_url"), ("celery", "flower_basic_auth"), ("celery", "result_backend"), + ("opensearch", "username"), + ("opensearch", "password"), } all_keys = {(s, k) for s, v in conf.configuration_description.items() for k in v.get("options")} suspected_sensitive = {(s, k) for (s, k) in all_keys if k.endswith(("password", "kwargs"))} diff --git a/tests/providers/opensearch/conftest.py b/tests/providers/opensearch/conftest.py index 934bbd642ad5..35291b0d6e2e 100644 --- a/tests/providers/opensearch/conftest.py +++ b/tests/providers/opensearch/conftest.py @@ -50,6 +50,126 @@ def index(self, document: dict, index_name: str, doc_id: int, **kwargs: Any) -> return doc_id +class MockClient(OpenSearch): + def count(self, index: Any = None, body: Any = None): + return {"count": 1, "_shards": {"total": 1, "successful": 1, "skipped": 0, "failed": 0}} + + def search(self, index=None, body=None, sort=None, size=None, from_=None): + return self.sample_log_response() + + def sample_log_response(self): + return { + "_shards": {"failed": 0, "skipped": 0, "successful": 7, "total": 7}, + "hits": { + "hits": [ + { + "_id": "jdeZT4kBjAZqZnexVUxk", + "_index": ".ds-filebeat-8.8.2-2023.07.09-000001", + "_score": 2.482621, + "_source": { + "@timestamp": "2023-07-13T14:13:15.140Z", + "asctime": "2023-07-09T07:47:43.907+0000", + "container": {"id": "airflow"}, + "dag_id": "example_bash_operator", + "ecs": {"version": "8.0.0"}, + "execution_date": "2023_07_09T07_47_32_000000", + "filename": "taskinstance.py", + "input": {"type": "log"}, + "levelname": "INFO", + "lineno": 1144, + "log": { + "file": { + "path": "/opt/airflow/Documents/GitHub/airflow/logs/" + "dag_id=example_bash_operator'" + "/run_id=owen_run_run/task_id=run_after_loop/attempt=1.log" + }, + "offset": 0, + }, + "offset": 1688888863907337472, + "log_id": "example_bash_operator-run_after_loop-owen_run_run--1-1", + "message": "Dependencies all met for " + "dep_context=non-requeueable deps " + "ti=", + "task_id": "run_after_loop", + "try_number": "1", + }, + "_type": "_doc", + }, + { + "_id": "qteZT4kBjAZqZnexVUxl", + "_index": ".ds-filebeat-8.8.2-2023.07.09-000001", + "_score": 2.482621, + "_source": { + "@timestamp": "2023-07-13T14:13:15.141Z", + "asctime": "2023-07-09T07:47:43.917+0000", + "container": {"id": "airflow"}, + "dag_id": "example_bash_operator", + "ecs": {"version": "8.0.0"}, + "execution_date": "2023_07_09T07_47_32_000000", + "filename": "taskinstance.py", + "input": {"type": "log"}, + "levelname": "INFO", + "lineno": 1347, + "log": { + "file": { + "path": "/opt/airflow/Documents/GitHub/airflow/logs/" + "dag_id=example_bash_operator" + "/run_id=owen_run_run/task_id=run_after_loop/attempt=1.log" + }, + "offset": 988, + }, + "offset": 1688888863917961216, + "log_id": "example_bash_operator-run_after_loop-owen_run_run--1-1", + "message": "Starting attempt 1 of 1", + "task_id": "run_after_loop", + "try_number": "1", + }, + "_type": "_doc", + }, + { + "_id": "v9eZT4kBjAZqZnexVUx2", + "_index": ".ds-filebeat-8.8.2-2023.07.09-000001", + "_score": 2.482621, + "_source": { + "@timestamp": "2023-07-13T14:13:15.143Z", + "asctime": "2023-07-09T07:47:43.928+0000", + "container": {"id": "airflow"}, + "dag_id": "example_bash_operator", + "ecs": {"version": "8.0.0"}, + "execution_date": "2023_07_09T07_47_32_000000", + "filename": "taskinstance.py", + "input": {"type": "log"}, + "levelname": "INFO", + "lineno": 1368, + "log": { + "file": { + "path": "/opt/airflow/Documents/GitHub/airflow/logs/" + "dag_id=example_bash_operator" + "/run_id=owen_run_run/task_id=run_after_loop/attempt=1.log" + }, + "offset": 1372, + }, + "offset": 1688888863928218880, + "log_id": "example_bash_operator-run_after_loop-owen_run_run--1-1", + "message": "Executing on 2023-07-09 " + "07:47:32+00:00", + "task_id": "run_after_loop", + "try_number": "1", + }, + "_type": "_doc", + }, + ], + "max_score": 2.482621, + "total": {"relation": "eq", "value": 36}, + }, + "timed_out": False, + "took": 7, + } + + @pytest.fixture def mock_hook(monkeypatch): monkeypatch.setattr(OpenSearchHook, "search", MockSearch.search) diff --git a/tests/providers/opensearch/log/__init__.py b/tests/providers/opensearch/log/__init__.py new file mode 100644 index 000000000000..13a83393a912 --- /dev/null +++ b/tests/providers/opensearch/log/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/providers/opensearch/log/test_os_json_formatter.py b/tests/providers/opensearch/log/test_os_json_formatter.py new file mode 100644 index 000000000000..27cf2be21163 --- /dev/null +++ b/tests/providers/opensearch/log/test_os_json_formatter.py @@ -0,0 +1,92 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +import json +import logging + +import pendulum +import pytest + +from airflow.providers.opensearch.log.os_task_handler import ( + OpensearchJSONFormatter, +) + +pytestmark = pytest.mark.db_test + + +class TestOpensearchJSONFormatter: + JSON_FIELDS = ["asctime", "filename", "lineno", "levelname", "message", "exc_text"] + EXTRA_FIELDS = { + "dag_id": "dag1", + "task_id": "task1", + "execution_date": "2023-11-17", + "try_number": "1", + "log_id": "Some_log_id", + } + + @pytest.fixture + def os_json_formatter(self): + return OpensearchJSONFormatter() + + @pytest.fixture + def log_record(self): + return logging.LogRecord( + name="test", + level=logging.INFO, + pathname="test_file.txt", + lineno=1, + msg="Test message", + args=(), + exc_info=None, + ) + + def test_format_log_record(self, os_json_formatter, log_record): + """Test the log record formatting.""" + os_json_formatter.json_fields = self.JSON_FIELDS + formatted = os_json_formatter.format(log_record) + data = json.loads(formatted) + assert all(key in self.JSON_FIELDS for key in data.keys()) + assert data["filename"] == "test_file.txt" + assert data["lineno"] == 1 + assert data["levelname"] == "INFO" + assert data["message"] == "Test message" + + def test_formattime_in_iso8601_format(self, os_json_formatter, log_record): + os_json_formatter.json_fields = ["asctime"] + iso8601_format = os_json_formatter.formatTime(log_record) + try: + pendulum.parse(iso8601_format, strict=True) + except ValueError: + raise Exception("Time is not in ISO8601 format") + + def test_extra_fields(self, os_json_formatter, log_record): + os_json_formatter.json_fields = self.JSON_FIELDS + os_json_formatter.extras = self.EXTRA_FIELDS + formatted = os_json_formatter.format(log_record) + data = json.loads(formatted) + assert all((key in self.JSON_FIELDS or key in self.EXTRA_FIELDS) for key in data.keys()) + assert data["filename"] == "test_file.txt" + assert data["lineno"] == 1 + assert data["levelname"] == "INFO" + assert data["dag_id"] == "dag1" + assert data["task_id"] == "task1" + assert data["execution_date"] == "2023-11-17" + assert data["try_number"] == "1" + assert data["log_id"] == "Some_log_id" diff --git a/tests/providers/opensearch/log/test_os_response.py b/tests/providers/opensearch/log/test_os_response.py new file mode 100644 index 000000000000..d4c1858d37a9 --- /dev/null +++ b/tests/providers/opensearch/log/test_os_response.py @@ -0,0 +1,134 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from typing import Any + +import pytest + +from airflow.providers.opensearch.log.os_response import ( + AttributeList, + Hit, + HitMeta, + OpensearchResponse, +) +from airflow.providers.opensearch.log.os_task_handler import OpensearchTaskHandler + +pytestmark = pytest.mark.db_test + + +class TestHitAndHitMetaAndOpenSearchResponse: + OS_DOCUMENT: dict[str, Any] = { + "_shards": {"failed": 0, "skipped": 0, "successful": 7, "total": 7}, + "hits": { + "hits": [ + { + "_id": "jdeZT4kBjAZqZnexVUxk", + "_index": ".ds-filebeat-8.8.2-2023.07.09-000001", + "_score": 2.482621, + "_source": { + "@timestamp": "2023-07-13T14:13:15.140Z", + "asctime": "2023-07-09T07:47:43.907+0000", + "container": {"id": "airflow"}, + "dag_id": "example_bash_operator", + "ecs": {"version": "8.0.0"}, + "execution_date": "2023_07_09T07_47_32_000000", + "filename": "taskinstance.py", + "input": {"type": "log"}, + "levelname": "INFO", + "lineno": 1144, + "log": { + "file": { + "path": "/opt/airflow/Documents/GitHub/airflow/logs/" + "dag_id=example_bash_operator'" + "/run_id=owen_run_run/task_id=run_after_loop/attempt=1.log" + }, + "offset": 0, + }, + "log.offset": 1688888863907337472, + "log_id": "example_bash_operator-run_after_loop-owen_run_run--1-1", + "message": "Dependencies all met for " + "dep_context=non-requeueable deps " + "ti=", + "task_id": "run_after_loop", + "try_number": "1", + }, + "_type": "_doc", + } + ] + }, + } + HIT_DOCUMENT = OS_DOCUMENT["hits"]["hits"][0] + + def test_hit_initialization_and_to_dict(self): + hit = Hit(self.HIT_DOCUMENT) + + assert hit.asctime == "2023-07-09T07:47:43.907+0000" + assert hit.dag_id == "example_bash_operator" + assert hit.lineno == 1144 + assert ( + hit.log.file.path + == "/opt/airflow/Documents/GitHub/airflow/logs/dag_id=example_bash_operator'/run_id=owen_run_run/task_id=run_after_loop/attempt=1.log" + ) + + # Test meta attribute + assert isinstance(hit.meta, HitMeta) + assert hit.to_dict() == self.HIT_DOCUMENT["_source"] + + def test_hitmeta_initialization_and_to_dict(self): + hitmeta = HitMeta(self.HIT_DOCUMENT) + + assert hitmeta.id == "jdeZT4kBjAZqZnexVUxk" + assert hitmeta.index == ".ds-filebeat-8.8.2-2023.07.09-000001" + assert hitmeta.score == 2.482621 + assert hitmeta.doc_type == "_doc" + + expected_dict = { + k[1:] if k.startswith("_") else k: v for (k, v) in self.HIT_DOCUMENT.items() if k != "_source" + } + expected_dict["doc_type"] = expected_dict.pop("type") + assert hitmeta.to_dict() == expected_dict + + def test_opensearchresponse_initialization_and_hits_and_bool(self): + task_handler = OpensearchTaskHandler( + base_log_folder="local/log/location", + end_of_log_mark="end_of_log\n", + write_stdout=False, + host="localhost", + port=9200, + username="dummy", + password="dummy", + json_format=False, + json_fields="asctime,filename,lineno,levelname,message,exc_text", + ) + response = OpensearchResponse(task_handler, self.OS_DOCUMENT) + + assert response._d_ == self.OS_DOCUMENT + assert isinstance(response.hits, AttributeList) + + for hit in response.hits: + assert isinstance(hit, Hit) + assert isinstance(hit.meta, HitMeta) + + assert response.hits[0].asctime == "2023-07-09T07:47:43.907+0000" + assert response.hits[0].levelname == "INFO" + + assert bool(response) is True diff --git a/tests/providers/opensearch/log/test_os_task_handler.py b/tests/providers/opensearch/log/test_os_task_handler.py new file mode 100644 index 000000000000..27f2ac772838 --- /dev/null +++ b/tests/providers/opensearch/log/test_os_task_handler.py @@ -0,0 +1,499 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import json +import logging +import os +import re +import shutil +from io import StringIO +from pathlib import Path +from unittest import mock +from unittest.mock import Mock, patch + +import pendulum +import pytest +from opensearchpy import OpenSearch +from opensearchpy.exceptions import NotFoundError + +from airflow.configuration import conf +from airflow.providers.opensearch.log.os_response import OpensearchResponse +from airflow.providers.opensearch.log.os_task_handler import ( + OpensearchTaskHandler, + get_os_kwargs_from_config, + getattr_nested, +) +from airflow.utils import timezone +from airflow.utils.state import DagRunState, TaskInstanceState +from airflow.utils.timezone import datetime +from tests.providers.opensearch.conftest import MockClient +from tests.test_utils.config import conf_vars +from tests.test_utils.db import clear_db_dags, clear_db_runs + +pytestmark = pytest.mark.db_test + +AIRFLOW_SOURCES_ROOT_DIR = Path(__file__).parents[4].resolve() +ES_PROVIDER_YAML_FILE = AIRFLOW_SOURCES_ROOT_DIR / "airflow" / "providers" / "opensearch" / "provider.yaml" + + +def get_ti(dag_id, task_id, execution_date, create_task_instance): + ti = create_task_instance( + dag_id=dag_id, + task_id=task_id, + execution_date=execution_date, + dagrun_state=DagRunState.RUNNING, + state=TaskInstanceState.RUNNING, + ) + ti.try_number = 1 + ti.raw = False + return ti + + +class TestOpensearchTaskHandler: + DAG_ID = "dag_for_testing_os_task_handler" + TASK_ID = "task_for_testing_os_log_handler" + EXECUTION_DATE = datetime(2016, 1, 1) + LOG_ID = f"{DAG_ID}-{TASK_ID}-2016-01-01T00:00:00+00:00-1" + JSON_LOG_ID = f"{DAG_ID}-{TASK_ID}-{OpensearchTaskHandler._clean_date(EXECUTION_DATE)}-1" + FILENAME_TEMPLATE = "{try_number}.log" + + @pytest.fixture + def ti(self, create_task_instance, create_log_template): + create_log_template(self.FILENAME_TEMPLATE, "{dag_id}-{task_id}-{execution_date}-{try_number}") + yield get_ti( + dag_id=self.DAG_ID, + task_id=self.TASK_ID, + execution_date=self.EXECUTION_DATE, + create_task_instance=create_task_instance, + ) + clear_db_runs() + clear_db_dags() + + def setup_method(self): + self.local_log_location = "local/log/location" + self.end_of_log_mark = "end_of_log\n" + self.write_stdout = False + self.json_format = False + self.json_fields = "asctime,filename,lineno,levelname,message,exc_text" + self.host_field = "host" + self.offset_field = "offset" + self.username = "admin" + self.password = "admin" + self.host = "localhost" + self.port = 9200 + self.os_task_handler = OpensearchTaskHandler( + base_log_folder=self.local_log_location, + end_of_log_mark=self.end_of_log_mark, + write_stdout=self.write_stdout, + host=self.host, + port=self.port, + username=self.username, + password=self.password, + json_format=self.json_format, + json_fields=self.json_fields, + host_field=self.host_field, + offset_field=self.offset_field, + ) + + self.os_task_handler.client = MockClient() + # self.index_name = "test_index" + # self.doc_type = "log" + # self.test_message = "some random stuff" + # self.body = {"message": self.test_message, "log_id": self.LOG_ID, "offset": 1} + # self.os.index(index=self.index_name, doc_type=self.doc_type, body=self.body, id=1) + + def teardown_method(self): + shutil.rmtree(self.local_log_location.split(os.path.sep)[0], ignore_errors=True) + + def test_os_response(self): + sample_response = self.os_task_handler.client.sample_log_response() + response = OpensearchResponse(self.os_task_handler, sample_response) + logs_by_host = self.os_task_handler._group_logs_by_host(response) + + def concat_logs(lines): + log_range = -1 if lines[-1].message == self.os_task_handler.end_of_log_mark else None + return "\n".join(self.os_task_handler._format_msg(line) for line in lines[:log_range]) + + for hosted_log in logs_by_host.values(): + message = concat_logs(hosted_log) + + assert ( + message == "Dependencies all met for dep_context=non-requeueable" + " deps ti=\n" + "Starting attempt 1 of 1\nExecuting " + "on 2023-07-09 07:47:32+00:00" + ) + + def test_client(self): + assert issubclass(type(self.os_task_handler.client), OpenSearch) + assert self.os_task_handler.index_patterns == "_all" + + def test_client_with_config(self): + config = dict(conf.getsection("opensearch_configs")) + expected_dict = { + "http_compress": False, + "use_ssl": False, + "verify_certs": False, + "ssl_assert_hostname": False, + "ssl_show_warn": False, + "ca_certs": "", + } + assert config == expected_dict + # ensure creating with configs does not fail + OpensearchTaskHandler( + base_log_folder=self.local_log_location, + end_of_log_mark=self.end_of_log_mark, + write_stdout=self.write_stdout, + host=self.host, + port=self.port, + username=self.username, + password=self.password, + json_format=self.json_format, + json_fields=self.json_fields, + host_field=self.host_field, + offset_field=self.offset_field, + os_kwargs=config, + ) + + def test_client_with_patterns(self): + # ensure creating with index patterns does not fail + patterns = "test_*,other_*" + handler = OpensearchTaskHandler( + base_log_folder=self.local_log_location, + end_of_log_mark=self.end_of_log_mark, + write_stdout=self.write_stdout, + host=self.host, + port=self.port, + username=self.username, + password=self.password, + json_format=self.json_format, + json_fields=self.json_fields, + host_field=self.host_field, + offset_field=self.offset_field, + index_patterns=patterns, + ) + assert handler.index_patterns == patterns + + def test_read(self, ti): + ts = pendulum.now() + logs, metadatas = self.os_task_handler.read( + ti, 1, {"offset": 0, "last_log_timestamp": str(ts), "end_of_log": False} + ) + + assert 1 == len(logs) + assert len(logs) == len(metadatas) + assert len(logs[0]) == 1 + assert ( + logs[0][0][-1] == "Dependencies all met for dep_context=non-requeueable" + " deps ti=\n" + "Starting attempt 1 of 1\nExecuting " + "on 2023-07-09 07:47:32+00:00" + ) + assert not metadatas[0]["end_of_log"] + assert timezone.parse(metadatas[0]["last_log_timestamp"]) > ts + + def test_read_with_patterns(self, ti): + ts = pendulum.now() + with mock.patch.object(self.os_task_handler, "index_patterns", new="test_*,other_*"): + logs, metadatas = self.os_task_handler.read( + ti, 1, {"offset": 0, "last_log_timestamp": str(ts), "end_of_log": False} + ) + + assert 1 == len(logs) + assert len(logs) == len(metadatas) + assert len(logs[0]) == 1 + assert ( + logs[0][0][-1] == "Dependencies all met for dep_context=non-requeueable" + " deps ti=\n" + "Starting attempt 1 of 1\nExecuting " + "on 2023-07-09 07:47:32+00:00" + ) + assert not metadatas[0]["end_of_log"] + assert timezone.parse(metadatas[0]["last_log_timestamp"]) > ts + + def test_read_with_patterns_no_match(self, ti): + ts = pendulum.now() + with mock.patch.object(self.os_task_handler, "index_patterns", new="test_other_*,test_another_*"): + with mock.patch.object( + self.os_task_handler.client, + "search", + return_value={ + "_shards": {"failed": 0, "skipped": 0, "successful": 7, "total": 7}, + "hits": {"hits": []}, + "timed_out": False, + "took": 7, + }, + ): + logs, metadatas = self.os_task_handler.read( + ti, 1, {"offset": 0, "last_log_timestamp": str(ts), "end_of_log": False} + ) + + assert 1 == len(logs) + assert len(logs) == len(metadatas) + assert [[]] == logs + assert not metadatas[0]["end_of_log"] + assert "0" == metadatas[0]["offset"] + # last_log_timestamp won't change if no log lines read. + assert timezone.parse(metadatas[0]["last_log_timestamp"]) == ts + + def test_read_with_missing_index(self, ti): + ts = pendulum.now() + with mock.patch.object(self.os_task_handler, "index_patterns", new="nonexistent,test_*"): + with mock.patch.object( + self.os_task_handler.client, "count", side_effect=NotFoundError(404, "IndexNotFoundError") + ): + with pytest.raises(NotFoundError, match=r"IndexNotFoundError"): + self.os_task_handler.read( + ti, 1, {"offset": 0, "last_log_timestamp": str(ts), "end_of_log": False} + ) + + @pytest.mark.parametrize("seconds", [3, 6]) + def test_read_missing_logs(self, seconds, create_task_instance): + """ + When the log actually isn't there to be found, we only want to wait for 5 seconds. + In this case we expect to receive a message of the form 'Log {log_id} not found in Opensearch ...' + """ + ti = get_ti( + self.DAG_ID, + self.TASK_ID, + pendulum.instance(self.EXECUTION_DATE).add(days=1), # so logs are not found + create_task_instance=create_task_instance, + ) + ts = pendulum.now().add(seconds=-seconds) + with mock.patch.object( + self.os_task_handler.client, + "search", + return_value={ + "_shards": {"failed": 0, "skipped": 0, "successful": 7, "total": 7}, + "hits": {"hits": []}, + "timed_out": False, + "took": 7, + }, + ): + logs, metadatas = self.os_task_handler.read(ti, 1, {"offset": 0, "last_log_timestamp": str(ts)}) + + assert 1 == len(logs) + if seconds > 5: + # we expect a log not found message when checking began more than 5 seconds ago + assert len(logs[0]) == 1 + actual_message = logs[0][0][1] + expected_pattern = r"^\*\*\* Log .* not found in Opensearch.*" + assert re.match(expected_pattern, actual_message) is not None + assert metadatas[0]["end_of_log"] is True + else: + # we've "waited" less than 5 seconds so it should not be "end of log" and should be no log message + assert len(logs[0]) == 0 + assert logs == [[]] + assert metadatas[0]["end_of_log"] is False + assert len(logs) == len(metadatas) + assert "0" == metadatas[0]["offset"] + assert timezone.parse(metadatas[0]["last_log_timestamp"]) == ts + + def test_read_with_none_metadata(self, ti): + logs, metadatas = self.os_task_handler.read(ti, 1) + assert 1 == len(logs) + assert len(logs) == len(metadatas) + assert ( + logs[0][0][-1] == "Dependencies all met for dep_context=non-requeueable" + " deps ti=\n" + "Starting attempt 1 of 1\nExecuting " + "on 2023-07-09 07:47:32+00:00" + ) + assert not metadatas[0]["end_of_log"] + assert timezone.parse(metadatas[0]["last_log_timestamp"]) < pendulum.now() + + def test_set_context(self, ti): + self.os_task_handler.set_context(ti) + assert self.os_task_handler.mark_end_on_close + + def test_set_context_w_json_format_and_write_stdout(self, ti): + formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") + self.os_task_handler.formatter = formatter + self.os_task_handler.write_stdout = True + self.os_task_handler.json_format = True + self.os_task_handler.set_context(ti) + + def test_close(self, ti): + formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") + self.os_task_handler.formatter = formatter + + self.os_task_handler.set_context(ti) + self.os_task_handler.close() + with open( + os.path.join(self.local_log_location, self.FILENAME_TEMPLATE.format(try_number=1)) + ) as log_file: + # end_of_log_mark may contain characters like '\n' which is needed to + # have the log uploaded but will not be stored in elasticsearch. + # so apply the strip() to log_file.read() + log_line = log_file.read().strip() + assert log_line.endswith(self.end_of_log_mark.strip()) + assert self.os_task_handler.closed + + def test_close_no_mark_end(self, ti): + ti.raw = True + self.os_task_handler.set_context(ti) + self.os_task_handler.close() + with open( + os.path.join(self.local_log_location, self.FILENAME_TEMPLATE.format(try_number=1)) + ) as log_file: + assert self.end_of_log_mark not in log_file.read() + assert self.os_task_handler.closed + + def test_close_closed(self, ti): + self.os_task_handler.closed = True + self.os_task_handler.set_context(ti) + self.os_task_handler.close() + with open( + os.path.join(self.local_log_location, self.FILENAME_TEMPLATE.format(try_number=1)) + ) as log_file: + assert 0 == len(log_file.read()) + + def test_close_with_no_handler(self, ti): + self.os_task_handler.set_context(ti) + self.os_task_handler.handler = None + self.os_task_handler.close() + with open( + os.path.join(self.local_log_location, self.FILENAME_TEMPLATE.format(try_number=1)) + ) as log_file: + assert 0 == len(log_file.read()) + assert self.os_task_handler.closed + + def test_close_with_no_stream(self, ti): + self.os_task_handler.set_context(ti) + self.os_task_handler.handler.stream = None + self.os_task_handler.close() + with open( + os.path.join(self.local_log_location, self.FILENAME_TEMPLATE.format(try_number=1)) + ) as log_file: + assert self.end_of_log_mark in log_file.read() + assert self.os_task_handler.closed + + self.os_task_handler.set_context(ti) + self.os_task_handler.handler.stream.close() + self.os_task_handler.close() + with open( + os.path.join(self.local_log_location, self.FILENAME_TEMPLATE.format(try_number=1)) + ) as log_file: + assert self.end_of_log_mark in log_file.read() + assert self.os_task_handler.closed + + def test_render_log_id(self, ti): + assert self.LOG_ID == self.os_task_handler._render_log_id(ti, 1) + + self.os_task_handler.json_format = True + assert self.JSON_LOG_ID == self.os_task_handler._render_log_id(ti, 1) + + # + def test_clean_date(self): + clean_execution_date = self.os_task_handler._clean_date(datetime(2016, 7, 8, 9, 10, 11, 12)) + assert "2016_07_08T09_10_11_000012" == clean_execution_date + + @mock.patch("sys.__stdout__", new_callable=StringIO) + def test_dynamic_offset(self, stdout_mock, ti, time_machine): + # arrange + handler = OpensearchTaskHandler( + base_log_folder=self.local_log_location, + end_of_log_mark=self.end_of_log_mark, + write_stdout=True, + host=self.host, + port=self.port, + username=self.username, + password=self.password, + json_format=True, + json_fields=self.json_fields, + host_field=self.host_field, + offset_field=self.offset_field, + ) + handler.formatter = logging.Formatter() + + logger = logging.getLogger(__name__) + logger.handlers = [handler] + logger.propagate = False + + ti._log = logger + handler.set_context(ti) + + t1 = pendulum.local(year=2017, month=1, day=1, hour=1, minute=1, second=15) + t2, t3 = t1 + pendulum.duration(seconds=5), t1 + pendulum.duration(seconds=10) + + # act + time_machine.move_to(t1, tick=False) + ti.log.info("Test") + time_machine.move_to(t2, tick=False) + ti.log.info("Test2") + time_machine.move_to(t3, tick=False) + ti.log.info("Test3") + + # assert + first_log, second_log, third_log = map(json.loads, stdout_mock.getvalue().strip().splitlines()) + assert first_log["offset"] < second_log["offset"] < third_log["offset"] + assert first_log["asctime"] == t1.format("YYYY-MM-DDTHH:mm:ss.SSSZZ") + assert second_log["asctime"] == t2.format("YYYY-MM-DDTHH:mm:ss.SSSZZ") + assert third_log["asctime"] == t3.format("YYYY-MM-DDTHH:mm:ss.SSSZZ") + + def test_get_index_patterns_with_callable(self): + with patch("airflow.providers.opensearch.log.os_task_handler.import_string") as mock_import_string: + mock_callable = Mock(return_value="callable_index_pattern") + mock_import_string.return_value = mock_callable + + self.os_task_handler.index_patterns_callable = "path.to.index_pattern_callable" + result = self.os_task_handler._get_index_patterns({}) + + mock_import_string.assert_called_once_with("path.to.index_pattern_callable") + mock_callable.assert_called_once_with({}) + assert result == "callable_index_pattern" + + +def test_safe_attrgetter(): + class A: ... + + a = A() + a.b = "b" + a.c = None + a.x = a + a.x.d = "blah" + assert getattr_nested(a, "b", None) == "b" # regular getattr + assert getattr_nested(a, "x.d", None) == "blah" # nested val + assert getattr_nested(a, "aa", "heya") == "heya" # respects non-none default + assert getattr_nested(a, "c", "heya") is None # respects none value + assert getattr_nested(a, "aa", None) is None # respects none default + + +def test_retrieve_config_keys(): + """ + Tests that the OpensearchTaskHandler retrieves the correct configuration keys from the config file. + * old_parameters are removed + * parameters from config are automatically added + * constructor parameters missing from config are also added + :return: + """ + with conf_vars( + { + ("opensearch_configs", "http_compress"): "False", + ("opensearch_configs", "timeout"): "10", + } + ): + args_from_config = get_os_kwargs_from_config().keys() + # verify_certs comes from default config value + assert "verify_certs" in args_from_config + # timeout comes from config provided value + assert "timeout" in args_from_config + # http_compress comes from config value + assert "http_compress" in args_from_config + assert "self" not in args_from_config