diff --git a/pip/qsharp/__init__.py b/pip/qsharp/__init__.py index 50f3224b90..7e51c74089 100644 --- a/pip/qsharp/__init__.py +++ b/pip/qsharp/__init__.py @@ -1,6 +1,7 @@ # Copyright (c) Microsoft Corporation. # Licensed under the MIT License. +from . import telemetry_events from ._qsharp import ( init, eval, @@ -16,6 +17,8 @@ ShotResult, ) +telemetry_events.on_import() + from ._native import Result, Pauli, QSharpError, TargetProfile # IPython notebook specific features diff --git a/pip/qsharp/_qsharp.py b/pip/qsharp/_qsharp.py index f617384f31..2cbfeb8501 100644 --- a/pip/qsharp/_qsharp.py +++ b/pip/qsharp/_qsharp.py @@ -1,6 +1,7 @@ # Copyright (c) Microsoft Corporation. # Licensed under the MIT License. +from . import telemetry_events from ._native import ( Interpreter, TargetProfile, @@ -22,6 +23,7 @@ from .estimator._estimator import EstimatorResult, EstimatorParams import json import os +from time import monotonic _interpreter = None @@ -234,6 +236,9 @@ def run( if shots < 1: raise QSharpError("The number of shots must be greater than 0.") + telemetry_events.on_run(shots) + start_time = monotonic() + results: List[ShotResult] = [] def print_output(output: Output) -> None: @@ -263,6 +268,9 @@ def on_save_events(output: Output) -> None: # compilation. entry_expr = None + durationMs = (monotonic() - start_time) * 1000 + telemetry_events.on_run_end(durationMs, shots) + if save_events: return results else: diff --git a/pip/qsharp/telemetry.py b/pip/qsharp/telemetry.py new file mode 100644 index 0000000000..f7b266cd3f --- /dev/null +++ b/pip/qsharp/telemetry.py @@ -0,0 +1,308 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +""" +This module sends telemetry directly to Azure Monitor using a similar mechanism and +format to the Azure Monitor OpenTelemetry Python SDK. It only supports custom metrics of +type "counter" and "histogram" for now. It's goal is to be minimal in size and dependencies, +and easy to read to understand exactly what data is being sent. + +To use this API, simply call `log_telemetry` with the metric name, value, and any other +optional properties. The telemetry will be batched and sent at a regular intervals (60 sec), +and when the process is about to exit. + +Disable qsharp Python telemetry by setting the environment variable `QSHARP_PYTHON_TELEMETRY=none`. +""" + +import atexit +import json +import locale +import logging +import os +import platform +import time +import urllib.request +import warnings + +from datetime import datetime, timezone +from queue import SimpleQueue, Empty +from threading import Thread +from typing import TypedDict, Union + +logger = logging.getLogger(__name__) + +# TODO: These should be potentially set by the build pipeline. However, they are not +# secret values, and it might be useful to still get telemetry even from other builds or forks +AIKEY = os.environ.get("QSHARP_AI_KEY") or "5ef527ea-7943-4e42-866d-e3b291d44e2b" +AIURL = ( + os.environ.get("QSHARP_AI_URL") + or "https://westus2-2.in.applicationinsights.azure.com//v2.1/track" +) + +TELEMETRY_ENABLED = not os.environ.get("QSHARP_PYTHON_TELEMETRY") == "none" +BATCH_INTERVAL_SEC = int(os.environ.get("QSHARP_PYTHON_TELEMETRY_INTERVAL") or 60) + + +# The below is taken from the Azure Monitor Python SDK +def _getlocale(): + try: + with warnings.catch_warnings(): + # Workaround for https://github.com/python/cpython/issues/82986 by continuing to use getdefaultlocale() even though it has been deprecated. + # Ignore the deprecation warnings to reduce noise + warnings.simplefilter("ignore", category=DeprecationWarning) + return locale.getdefaultlocale()[0] + except AttributeError: + # Use this as a fallback if locale.getdefaultlocale() doesn't exist (>Py3.13) + return locale.getlocale()[0] + + +# Minimal device information to include with telemetry +AI_DEVICE_LOCALE = _getlocale() +AI_DEVICE_OS_VERSION = platform.version() + + +class Metric(TypedDict): + """Used internally for objects in the telemetry queue""" + + name: str + value: float + count: int + properties: dict + type: str + + +class PendingMetric(Metric): + """Used internally to aggregate metrics before sending""" + + min: float + max: float + + +# Maintain a collection of custom metrics to log, stored by metric name with a list entry +# for each unique set of properties per metric name +pending_metrics: dict[str, list[PendingMetric]] = {} + +# The telemetry queue is used to send telemetry from the main thread to the telemetry thread +# This simplifies any thread-safety concerns, and avoids the need for locks, etc. +telemetry_queue = SimpleQueue() + + +def log_telemetry( + name: str, + value: float, + count: int = 1, + properties: dict = {}, + type: str = "counter", +): + """ + Logs a custom metric with the name provided. Properties are optional and can be used to + capture additional context about the metric (but should be a relatively static set of values, as + each unique set of properties will be sent as a separate metric and creates a separate 'dimension' + in the backend telemetry store). + + The type can be either 'counter' or 'histogram'. A 'counter' is a simple value that is summed + over time, such as how many times an event occurs, while a 'histogram' is used to track 'quantative' + values, such as the distribution of values over time, e.g., the duration of an operation. + + Example usage for a counter: + + log_telemetry("qir_generated", 1, properties={"profile": "base", "qsharp.version": "1.9.0"}) + + Example usage for a histogram: + + log_telemetry("simulation_duration", 123.45, type="histogram") + + """ + if not TELEMETRY_ENABLED: + return + obj: Metric = { + "name": name, + "value": value, + "count": count, + "properties": properties, + "type": type, + } + + logger.debug("Queuing telemetry: %s", obj) + telemetry_queue.put(obj) + + +def flush_telemetry(): + """POSTs any pending telemetry immediately to the collector endpoint""" + if not TELEMETRY_ENABLED: + return + telemetry_queue.put("flush") + + +def _add_to_pending(metric: Metric): + """Used by the telemetry thread to aggregate metrics before sending""" + + if metric["type"] not in ["counter", "histogram"]: + raise Exception("Metric must be of type counter or histogram") + + # Get or create the entry list for this name + name_entries = pending_metrics.setdefault(metric["name"], []) + + # Try to find the entry with matching properties + # This relies on the fact dicts with matching keys/values compare equal in Python + prop_entry = next( + ( + entry + for entry in name_entries + if entry["properties"] == metric["properties"] + ), + None, + ) + if prop_entry is None: + new_entry: PendingMetric = { + **metric, + "min": metric["value"], + "max": metric["value"], + } + name_entries.append(new_entry) + else: + if prop_entry["type"] != metric["type"]: + raise Exception("Cannot mix counter and histogram for the same metric name") + prop_entry["value"] += metric["value"] + prop_entry["count"] += metric["count"] + prop_entry["min"] = min(prop_entry["min"], metric["value"]) + prop_entry["max"] = max(prop_entry["max"], metric["value"]) + + +def _pending_to_payload(): + """Converts the pending metrics to the JSON payload for Azure Monitor""" + + result_array = [] + formatted_time = ( + datetime.now(timezone.utc) + .isoformat(timespec="microseconds") + .replace("+00:00", "Z") + ) + for name in pending_metrics: + for unique_props in pending_metrics[name]: + # The below matches the entry format for Azure Monitor REST API + entry = { + "ver": 1, + "name": "Microsoft.ApplicationInsights.Metric", + "time": formatted_time, + "sampleRate": 100.0, + "iKey": AIKEY, + "tags": { + "ai.device.locale": AI_DEVICE_LOCALE, + "ai.device.osVersion": AI_DEVICE_OS_VERSION, + }, + "data": { + "baseType": "MetricData", + "baseData": { + "ver": 2, + "metrics": [ + { + "name": unique_props["name"], + "value": unique_props["value"], + "count": unique_props["count"], + } + ], + "properties": unique_props["properties"], + }, + }, + } + # Histogram values differ only in that they have min/max values also + if unique_props["type"] == "histogram": + entry["data"]["baseData"]["metrics"][0]["min"] = unique_props["min"] + entry["data"]["baseData"]["metrics"][0]["max"] = unique_props["max"] + + result_array.append(entry) + + return result_array + + +def _post_telemetry(): + """Posts the pending telemetry to Azure Monitor""" + + if not TELEMETRY_ENABLED: + return + + if len(pending_metrics) == 0: + return True # Nothing to send + + payload = json.dumps(_pending_to_payload()).encode("utf-8") + logger.debug("Sending telemetry request: %s", payload) + try: + request = urllib.request.Request(AIURL, data=payload, method="POST") + request.add_header("Content-Type", "application/json") + with urllib.request.urlopen(request, timeout=10) as response: + logger.debug("Telemetry response: %s", response.status) + # On a successful post, clear the pending list. (Else they will be included on the next retry) + pending_metrics.clear() + return True + + except Exception as e: + logger.exception( + "Failed to post telemetry. Pending metrics will be retried at the next interval." + ) + return False + + +# This is the thread that aggregates and posts telemetry at a regular interval. +# The main thread will signal the thread loop to exit when the process is about to exit. +def _telemetry_thread_start(): + next_post_sec: Union[float, None] = None + + def on_metric(msg: Metric): + nonlocal next_post_sec + + # Add to the pending batch to send next + _add_to_pending(msg) + + # Schedule the next post if we don't have one scheduled + if next_post_sec == None: + next_post_sec = time.monotonic() + BATCH_INTERVAL_SEC + + while True: + try: + # Block if no timeout, else wait a maximum of time until the next post is due + timeout = ( + None + if next_post_sec == None + else max(next_post_sec - time.monotonic(), 0) + ) + msg = telemetry_queue.get(timeout=timeout) + + if msg == "exit": + logger.debug("Exiting telemetry thread") + if not _post_telemetry(): + logger.error("Failed to post telemetry on exit") + return + elif msg == "flush": + logger.debug("Flushing telemetry") + if not _post_telemetry(): + logger.error("Failed to post telemetry on flush") + else: + on_metric(msg) + # Loop until the queue has been drained. This will cause the 'Empty' exception + # below once the queue is empty and it's time to post + continue + except Empty: + # No more telemetry within timeout, so write what we have pending + _post_telemetry() + + # If we get here, it's after a post attempt. Pending will still have items if the attempt + # failed, so updated the time for the next attempt in that case. + if len(pending_metrics) == 0: + next_post_sec = None + else: + next_post_sec = time.monotonic() + BATCH_INTERVAL_SEC + + +# When the process is about to exit, notify the telemetry thread to flush, and wait max 3 sec before exiting anyway +def on_exit(): + logger.debug("In on_exit handler") + telemetry_queue.put("exit") + telemetry_thread.join(timeout=3) + + +# Mark the telemetry thread as a deamon thread, else it will keep the process alive when the main thread exits +if TELEMETRY_ENABLED: + telemetry_thread = Thread(target=_telemetry_thread_start, daemon=True) + telemetry_thread.start() + atexit.register(on_exit) diff --git a/pip/qsharp/telemetry_events.py b/pip/qsharp/telemetry_events.py new file mode 100644 index 0000000000..3c38d7041a --- /dev/null +++ b/pip/qsharp/telemetry_events.py @@ -0,0 +1,76 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +from .telemetry import log_telemetry +import math + +# TODO: This should be populated by the build in the main module +QSHARP_VERSION = "1.9.0" + +# TODO: Log extra params like qubit count (buckets), qubit type for RE, etc.? + +default_props = {"qsharp.version": QSHARP_VERSION} + +# For metrics such as duration, we want to capture things like how many shots or qubits in +# the additional properties. However properties shouldn't be 'continuous' values, as they +# create new 'dimensions' on the backend, which is limited, thus we want to bucket these properties. + +# See some of the notes at: https://learn.microsoft.com/en-us/azure/azure-monitor/essentials/metrics-custom-overview#design-limitations-and-considerations + + +def get_shots_bucket(shots: int): + if shots <= 1: + return 1 + elif shots >= 1000000: + # Limit the buckets upper bound + return 1000000 + else: + # Bucket into nearest (rounded up) power of 10, e.g. 75 -> 100, 450 -> 1000, etc. + return 10 ** math.ceil(math.log10(shots)) + + +def on_import(): + log_telemetry("qsharp.import", 1, properties=default_props) + + +def on_run(shots: int): + log_telemetry( + "qsharp.run", + 1, + properties={**default_props, "shots": get_shots_bucket(shots)}, + ) + + +def on_run_end(durationMs: float, shots: int): + log_telemetry( + "qsharp.run.durationMs", + durationMs, + properties={**default_props, "shots": get_shots_bucket(shots)}, + type="histogram", + ) + + +def on_compile(profile: str) -> None: + log_telemetry("qsharp.compile", 1, properties={**default_props, "profile": profile}) + + +def on_compile_end(durationMs: float, profile: str) -> None: + log_telemetry( + "qsharp.compile.durationMs", + durationMs, + properties={**default_props, "profile": profile}, + type="histogram", + ) + + +def on_estimate(): + log_telemetry("qsharp.estimate", 1, properties=default_props) + + +def on_estimate_end(durationMs: float): + log_telemetry( + "qsharp.estimate.durationMs", + durationMs, + properties=default_props, + type="histogram", + )