-
Notifications
You must be signed in to change notification settings - Fork 87
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Initial Python telemetry #1972
base: main
Are you sure you want to change the base?
Initial Python telemetry #1972
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,304 @@ | ||
# Copyright (c) Microsoft Corporation. | ||
# Licensed under the MIT License. | ||
|
||
""" | ||
This module sends telemetry directly to Azure Monitor using a similar mechanism and | ||
format to the Azure Monitor OpenTelemetry Python SDK. It only supports custom metrics of | ||
type "counter" and "histogram" for now. It's goal is to be minimal in size and dependencies, | ||
and easy to read to understand exactly what data is being sent. | ||
|
||
To use this API, simply call `log_telemetry` with the metric name, value, and any other | ||
optional properties. The telemetry will be batched and sent at a regular intervals (60 sec), | ||
and when the process is about to exit. | ||
|
||
Disable qsharp Python telemetry by setting the environment variable `QSHARP_PYTHON_TELEMETRY=none`. | ||
""" | ||
|
||
import atexit | ||
import locale | ||
import logging | ||
import os | ||
import platform | ||
import time | ||
import urllib3 | ||
import warnings | ||
|
||
from datetime import datetime, timezone | ||
from queue import SimpleQueue, Empty | ||
from threading import Thread | ||
from typing import TypedDict, Union | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
# TODO: These should be potentially set by the build pipeline. However, they are not | ||
# secret values, and it might be useful to still get telemetry even from other builds or forks | ||
AIKEY = os.environ.get("QSHARP_AI_KEY") or "5ef527ea-7943-4e42-866d-e3b291d44e2b" | ||
AIURL = ( | ||
os.environ.get("QSHARP_AI_URL") | ||
or "https://westus2-2.in.applicationinsights.azure.com//v2.1/track" | ||
) | ||
|
||
TELEMETRY_ENABLED = not os.environ.get("QSHARP_PYTHON_TELEMETRY") == "none" | ||
BATCH_INTERVAL_SEC = int(os.environ.get("QSHARP_PYTHON_TELEMETRY_INTERVAL") or 60) | ||
|
||
|
||
# The below is taken from the Azure Monitor Python SDK | ||
def _getlocale(): | ||
try: | ||
with warnings.catch_warnings(): | ||
# Workaround for https://github.com/python/cpython/issues/82986 by continuing to use getdefaultlocale() even though it has been deprecated. | ||
# Ignore the deprecation warnings to reduce noise | ||
warnings.simplefilter("ignore", category=DeprecationWarning) | ||
return locale.getdefaultlocale()[0] | ||
except AttributeError: | ||
# Use this as a fallback if locale.getdefaultlocale() doesn't exist (>Py3.13) | ||
return locale.getlocale()[0] | ||
|
||
|
||
# Minimal device information to include with telemetry | ||
AI_DEVICE_LOCALE = _getlocale() | ||
AI_DEVICE_OS_VERSION = platform.version() | ||
|
||
|
||
class Metric(TypedDict): | ||
"""Used internally for objects in the telemetry queue""" | ||
|
||
name: str | ||
value: float | ||
count: int | ||
properties: dict | ||
type: str | ||
|
||
|
||
class PendingMetric(Metric): | ||
"""Used internally to aggregate metrics before sending""" | ||
|
||
min: float | ||
max: float | ||
|
||
|
||
# Maintain a collection of custom metrics to log, stored by metric name with a list entry | ||
# for each unique set of properties per metric name | ||
pending_metrics: dict[str, list[PendingMetric]] = {} | ||
|
||
# The telemetry queue is used to send telemetry from the main thread to the telemetry thread | ||
# This simplifies any thread-safety concerns, and avoids the need for locks, etc. | ||
telemetry_queue = SimpleQueue() | ||
|
||
|
||
def log_telemetry( | ||
name: str, | ||
value: float, | ||
count: int = 1, | ||
properties: dict = {}, | ||
type: str = "counter", | ||
): | ||
""" | ||
Logs a custom metric with the name provided. Properties are optional and can be used to | ||
capture additional context about the metric (but should be a relatively static set of values, as | ||
each unique set of properties will be sent as a separate metric and creates a separate 'dimension' | ||
in the backend telemetry store). | ||
|
||
The type can be either 'counter' or 'histogram'. A 'counter' is a simple value that is summed | ||
over time, such as how many times an event occurs, while a 'histogram' is used to track 'quantative' | ||
values, such as the distribution of values over time, e.g., the duration of an operation. | ||
|
||
Example usage for a counter: | ||
|
||
log_telemetry("qir_generated", 1, properties={"profile": "base", "qsharp.version": "1.9.0"}) | ||
|
||
Example usage for a histogram: | ||
|
||
log_telemetry("simulation_duration", 123.45, type="histogram") | ||
|
||
""" | ||
if not TELEMETRY_ENABLED: | ||
return | ||
obj: Metric = { | ||
"name": name, | ||
"value": value, | ||
"count": count, | ||
"properties": properties, | ||
"type": type, | ||
} | ||
|
||
logger.debug("Queuing telemetry: %s", obj) | ||
telemetry_queue.put(obj) | ||
|
||
|
||
def flush_telemetry(): | ||
"""POSTs any pending telemetry immediately to the collector endpoint""" | ||
if not TELEMETRY_ENABLED: | ||
return | ||
telemetry_queue.put("flush") | ||
|
||
|
||
def add_to_pending(metric: Metric): | ||
"""Used by the telemetry thread to aggregate metrics before sending""" | ||
|
||
if metric["type"] not in ["counter", "histogram"]: | ||
raise Exception("Metric must be of type counter or histogram") | ||
|
||
# Get or create the entry list for this name | ||
name_entries = pending_metrics.setdefault(metric["name"], []) | ||
|
||
# Try to find the entry with matching properties | ||
# This relies on the fact dicts with matching keys/values compare equal in Python | ||
prop_entry = next( | ||
( | ||
entry | ||
for entry in name_entries | ||
if entry["properties"] == metric["properties"] | ||
), | ||
None, | ||
) | ||
if prop_entry is None: | ||
new_entry: PendingMetric = { | ||
**metric, | ||
"min": metric["value"], | ||
"max": metric["value"], | ||
} | ||
name_entries.append(new_entry) | ||
else: | ||
if prop_entry["type"] != metric["type"]: | ||
raise Exception("Cannot mix counter and histogram for the same metric name") | ||
prop_entry["value"] += metric["value"] | ||
prop_entry["count"] += metric["count"] | ||
prop_entry["min"] = min(prop_entry["min"], metric["value"]) | ||
prop_entry["max"] = max(prop_entry["max"], metric["value"]) | ||
|
||
|
||
def pending_to_payload(): | ||
"""Converts the pending metrics to the JSON payload (as a list of dicts) for Azure Monitor""" | ||
|
||
result_array = [] | ||
formatted_time = ( | ||
datetime.now(timezone.utc) | ||
.isoformat(timespec="microseconds") | ||
.replace("+00:00", "Z") | ||
) | ||
for name in pending_metrics: | ||
for unique_props in pending_metrics[name]: | ||
# The below matches the entry format for Azure Monitor REST API | ||
entry = { | ||
"ver": 1, | ||
"name": "Microsoft.ApplicationInsights.Metric", | ||
"time": formatted_time, | ||
"sampleRate": 100.0, | ||
"iKey": AIKEY, | ||
"tags": { | ||
"ai.device.locale": AI_DEVICE_LOCALE, | ||
"ai.device.osVersion": AI_DEVICE_OS_VERSION, | ||
}, | ||
"data": { | ||
"baseType": "MetricData", | ||
"baseData": { | ||
"ver": 2, | ||
"metrics": [ | ||
{ | ||
"name": unique_props["name"], | ||
"value": unique_props["value"], | ||
"count": unique_props["count"], | ||
} | ||
], | ||
"properties": unique_props["properties"], | ||
}, | ||
}, | ||
} | ||
# Histogram values differ only in that they have min/max values also | ||
if unique_props["type"] == "histogram": | ||
entry["data"]["baseData"]["metrics"][0]["min"] = unique_props["min"] | ||
entry["data"]["baseData"]["metrics"][0]["max"] = unique_props["max"] | ||
|
||
result_array.append(entry) | ||
|
||
return result_array | ||
|
||
|
||
def post_telemetry(): | ||
"""Posts the pending telemetry to Azure Monitor""" | ||
|
||
if not TELEMETRY_ENABLED: | ||
return | ||
|
||
if len(pending_metrics) == 0: | ||
return True # Nothing to send | ||
|
||
payload = pending_to_payload() | ||
logger.debug("Sending telemetry request: %s", payload) | ||
try: | ||
response = urllib3.request( | ||
"POST", AIURL, timeout=5, retries=False, json=payload | ||
) | ||
logger.debug("Telemetry response: %s", response.status) | ||
# On a successful post, clear the pending list. (Else they will be included on the next retry) | ||
pending_metrics.clear() | ||
return True | ||
except Exception as e: | ||
logger.exception("Failed to post telemetry") | ||
return False | ||
|
||
|
||
# This is the thread that aggregates and posts telemetry at a regular interval. | ||
# The main thread will signal the thread loop to exit when the process is about to exit. | ||
def telemetry_thread_start(): | ||
next_post_time_sec: Union[float, None] = None | ||
|
||
def on_metric(msg: Metric): | ||
nonlocal next_post_time_sec | ||
|
||
# Add to the pending batch to send next | ||
add_to_pending(msg) | ||
|
||
# Schedule the next post if we don't have one scheduled | ||
if next_post_time_sec == None: | ||
next_post_time_sec = time.monotonic() + BATCH_INTERVAL_SEC | ||
|
||
while True: | ||
try: | ||
# Block if no timeout, else wait a maximum of time until the next post is due | ||
timeout = ( | ||
None | ||
if next_post_time_sec == None | ||
else max(next_post_time_sec - time.monotonic(), 0) | ||
) | ||
msg = telemetry_queue.get(timeout=timeout) | ||
|
||
if msg == "exit": | ||
logger.debug("Exiting telemetry thread") | ||
if not post_telemetry(): | ||
logger.error("Failed to post telemetry on exit") | ||
return | ||
elif msg == "flush": | ||
logger.debug("Flushing telemetry") | ||
if not post_telemetry(): | ||
logger.error("Failed to post telemetry on flush") | ||
else: | ||
on_metric(msg) | ||
# Loop until the queue has been drained. This will cause the 'Empty' exception | ||
# below once the queue is empty and it's time to post | ||
continue | ||
except Empty: | ||
# No more telemetry within timeout, so write what we have pending | ||
post_telemetry() | ||
|
||
# If we get here, it's after a post attempt. Pending will still have items if the attempt | ||
# failed, so updated the time for the next attempt in that case. | ||
if len(pending_metrics) == 0: | ||
next_post_time_sec = None | ||
else: | ||
next_post_time_sec = time.monotonic() + BATCH_INTERVAL_SEC | ||
|
||
|
||
# When the process is about to exit, notify the telemetry thread to flush, and wait max 3 sec before exiting anyway | ||
def on_exit(): | ||
logger.debug("In on_exit handler") | ||
telemetry_queue.put("exit") | ||
telemetry_thread.join(timeout=3) | ||
|
||
|
||
# Mark the telemetry thread as a deamon thread, else it will keep the process alive when the main thread exits | ||
if TELEMETRY_ENABLED: | ||
telemetry_thread = Thread(target=telemetry_thread_start, daemon=True) | ||
telemetry_thread.start() | ||
atexit.register(on_exit) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,76 @@ | ||
# Copyright (c) Microsoft Corporation. | ||
# Licensed under the MIT License. | ||
|
||
from .telemetry import log_telemetry | ||
import math | ||
|
||
# TODO: This should be populated by the build in the main module | ||
Check notice Code scanning / devskim A "TODO" or similar was left in source code, possibly indicating incomplete functionality Note
Suspicious comment
|
||
QSHARP_VERSION = "1.9.0" | ||
|
||
# TODO: Log extra params like qubit count (buckets), qubit type for RE, etc.? | ||
Check notice Code scanning / devskim A "TODO" or similar was left in source code, possibly indicating incomplete functionality Note
Suspicious comment
|
||
|
||
default_props = {"qsharp.version": QSHARP_VERSION} | ||
|
||
# For metrics such as duration, we want to capture things like how many shots or qubits in | ||
# the additional properties. However properties shouldn't be 'continuous' values, as they | ||
# create new 'dimensions' on the backend, which is limited, thus we want to bucket these properties. | ||
|
||
# See some of the notes at: https://learn.microsoft.com/en-us/azure/azure-monitor/essentials/metrics-custom-overview#design-limitations-and-considerations | ||
|
||
|
||
def get_shots_bucket(shots: int): | ||
if shots <= 1: | ||
return 1 | ||
elif shots >= 1000000: | ||
# Limit the buckets upper bound | ||
return 1000000 | ||
else: | ||
# Bucket into nearest (rounded up) power of 10, e.g. 75 -> 100, 450 -> 1000, etc. | ||
return 10 ** math.ceil(math.log10(shots)) | ||
|
||
|
||
def on_init(): | ||
log_telemetry("qsharp.init", 1, properties=default_props) | ||
|
||
|
||
def on_run(shots: int): | ||
log_telemetry( | ||
"qsharp.simulate", | ||
1, | ||
properties={**default_props, shots: get_shots_bucket(shots)}, | ||
) | ||
|
||
|
||
def on_run_end(durationMs: float, shots: int): | ||
log_telemetry( | ||
"qsharp.simulate.durationMs", | ||
durationMs, | ||
properties={**default_props, shots: get_shots_bucket(shots)}, | ||
type="histogram", | ||
) | ||
|
||
|
||
def on_compile(profile: str) -> None: | ||
log_telemetry("qsharp.compile", 1, properties={**default_props, "profile": profile}) | ||
|
||
|
||
def on_compile_end(durationMs: float, profile: str) -> None: | ||
log_telemetry( | ||
"qsharp.compile.durationMs", | ||
durationMs, | ||
properties={**default_props, "profile": profile}, | ||
type="histogram", | ||
) | ||
|
||
|
||
def on_estimate(): | ||
log_telemetry("qsharp.estimate", 1, properties=default_props) | ||
|
||
|
||
def on_estimate_end(durationMs: float): | ||
log_telemetry( | ||
"qsharp.estimate.durationMs", | ||
durationMs, | ||
properties=default_props, | ||
type="histogram", | ||
) |
Check notice
Code scanning / devskim
A "TODO" or similar was left in source code, possibly indicating incomplete functionality Note