From 6eb5bc1c5aab1186827b964f8e4ee818aefab954 Mon Sep 17 00:00:00 2001 From: Zaeem Athar Date: Thu, 18 Apr 2024 01:35:44 +0200 Subject: [PATCH] Accessing values through config. --- dlt/common/runtime/segment.py | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/dlt/common/runtime/segment.py b/dlt/common/runtime/segment.py index 011dcc9d61..9c7aa1d880 100644 --- a/dlt/common/runtime/segment.py +++ b/dlt/common/runtime/segment.py @@ -28,25 +28,25 @@ def init_segment(config: RunConfiguration) -> None: - global _WRITE_KEY, _SESSION, _SEGMENT_ENDPOINT - if "RUNTIME__TELEMETRY_ENDPOINT" not in os.environ: + if config.dlthub_telemetry_endpoint is None: + raise ValueError("dlthub_telemetry_endpoint not specified in RunConfiguration") + + if config.dlthub_telemetry_endpoint == "https://api.segment.io/v1/track": assert ( config.dlthub_telemetry_segment_write_key ), "dlthub_telemetry_segment_write_key not present in RunConfiguration" - # store write key - key_bytes = (config.dlthub_telemetry_segment_write_key + ":").encode("ascii") - _WRITE_KEY = base64.b64encode(key_bytes).decode("utf-8") - # store endpoint - _SEGMENT_ENDPOINT = config.dlthub_telemetry_endpoint - else: - _SEGMENT_ENDPOINT = os.environ["RUNTIME__TELEMETRY_ENDPOINT"] - _WRITE_KEY = None + global _WRITE_KEY, _SESSION, _SEGMENT_ENDPOINT # create thread pool to send telemetry to segment if not _SESSION: _SESSION = requests.Session() # flush pool on exit atexit.register(_at_exit_cleanup) + # store write key + key_bytes = (config.dlthub_telemetry_segment_write_key + ":").encode("ascii") + _WRITE_KEY = base64.b64encode(key_bytes).decode("utf-8") + # store endpoint + _SEGMENT_ENDPOINT = config.dlthub_telemetry_endpoint # cache the segment context _default_context_fields() @@ -101,13 +101,13 @@ def _segment_request_header(write_key: str) -> StrAny: Returns: Authentication headers for segment. """ - if not write_key: + if _SEGMENT_ENDPOINT == "https://api.segment.io/v1/track": return { + "Authorization": "Basic {}".format(write_key), "Content-Type": "application/json", } else: return { - "Authorization": "Basic {}".format(write_key), "Content-Type": "application/json", } @@ -181,9 +181,9 @@ def _send_event(event_name: str, properties: StrAny, context: StrAny) -> None: logger.debug("Skipping request to external service: payload was filtered out.") return - if not _WRITE_KEY and _SEGMENT_ENDPOINT != RunConfiguration.dlthub_telemetry_endpoint: - # If _WRITE_KEY is empty or `None`, telemetry has not been enabled - logger.debug("Skipping request to external service: telemetry key not set.") + if _SEGMENT_ENDPOINT is None: + # If _SEGMENT_ENDPOINT is `None`, telemetry has not been enabled + logger.debug("Skipping request to external service: telemetry endpoint not set.") return headers = _segment_request_header(_WRITE_KEY) @@ -198,7 +198,7 @@ def _future_send() -> None: # elapsed_time = (end_ts - start_ts) / 10e6 # print(f"SENDING TO Segment done: {elapsed_time}ms Status: {resp.status_code}") # handle different failure cases - if resp.status_code != 200 or resp.status_code != 204: + if resp.status_code not in [200, 204]: logger.debug( f"Segment telemetry request returned a {resp.status_code} response. " f"Body: {resp.text}"