From d1587ae4bb9e1215082f4ac5aa1485af6d0bd1e0 Mon Sep 17 00:00:00 2001 From: Paul Gear Date: Wed, 27 Dec 2023 06:59:50 +1000 Subject: [PATCH 1/3] Fix non-matching backticks in README --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index fac9eed..2234f6b 100644 --- a/README.md +++ b/README.md @@ -130,9 +130,9 @@ configure this to listen on a host and/or port other than the default ## Startup delay By default, until the NTP server has been running for 512 seconds (the minimum -time for 8 polls at 64-second intervals), `check_ntpmon`` will return OK (zero +time for 8 polls at 64-second intervals), `check_ntpmon` will return OK (zero return code). This is to prevent false positives on startup or for short-lived -VMs. To ignore this safety precaution, use `--run-time`` with a low number +VMs. To ignore this safety precaution, use `--run-time` with a low number (e.g. 1 sec). ## To do From c8bc76f9db5a7efde20377153952f067114cc9eb Mon Sep 17 00:00:00 2001 From: Paul Gear Date: Wed, 27 Dec 2023 17:58:18 +1000 Subject: [PATCH 2/3] Separate output from alerts Use underscores natively for all metric names Handle all command line argument processing in get_args() Implement peerstats for collectd & prometheus --- src/alert.py | 186 ++-------------------- src/line_protocol.py | 5 +- src/ntpmon.py | 131 +++++++-------- src/outputs.py | 289 ++++++++++++++++++++++++++++++++++ src/peer_stats.py | 40 ++--- unit_tests/test_peer_stats.py | 2 +- 6 files changed, 381 insertions(+), 272 deletions(-) create mode 100644 src/outputs.py diff --git a/src/alert.py b/src/alert.py index 46325de..ffeba4b 100644 --- a/src/alert.py +++ b/src/alert.py @@ -10,12 +10,9 @@ """ import pprint -import sys -import line_protocol import metrics - -from io import TextIOWrapper +import outputs from classifier import MetricClassifier @@ -78,76 +75,25 @@ } -""" -Metric types for collectd -""" -_collectdtypes = { - "frequency": "frequency/frequency_offset", - "offset": "offset/time_offset", - "reach": "reachability/percent", - "rootdelay": "rootdelay/time_offset", - "rootdisp": "rootdisp/time_offset", - "runtime": "runtime/duration", - "stratum": "stratum/count", - "sysjitter": "sysjitter/time_offset", - "sysoffset": "sysoffset/time_offset", -} - - -""" -Peer metric types, used by both collectd & telegraf -""" -_peer_types = { - "backup": "peers/count-backup", - "excess": "peers/count-excess", - "false": "peers/count-false", - "invalid": "peers/count-invalid", - "outlier": "peers/count-outlier", - "pps": "peers/count-pps", - "survivor": "peers/count-survivor", - "sync": "peers/count-sync", -} - -""" -Metric types and suffixes for prometheus -""" -_prometheus_types = { - "frequency": (None, "_hertz", "Frequency error of the local clock"), - "offset": (None, "_seconds", "Mean clock offset of peers"), - "reach": ("%", "_ratio", "Peer reachability over the last 8 polls"), - "rootdelay": (None, "_seconds", "Network delay to stratum 0 sources"), - "rootdisp": (None, "_seconds", "Maximum calculated offset from stratum 0 sources"), - "runtime": (None, "_duration_seconds", "Duration NTP service has been running"), - "stratum": ("i", None, "NTP stratum of this server"), - "sysjitter": (None, "_seconds", "RMS average of most recent system peer offset differences"), - "sysoffset": (None, "_seconds", "Current clock offset of selected system peer"), -} - -""" -Metric types for telegraf -""" -_telegraf_types = { - "frequency": None, - "offset": None, - "reach": None, - "rootdelay": None, - "rootdisp": None, - "runtime": None, - "stratum": "i", - "sysjitter": None, - "sysoffset": None, -} - - class NTPAlerter(object): def __init__(self, checks): self.checks = checks self.mc = MetricClassifier(_metricdefs) self.metrics = {} self.objs = {} - self.prometheus_objs = {} - def collectmetrics(self, checkobjs, debug): + def alert(self, checkobjs: dict, output: outputs.Output, debug: bool = False) -> None: + """ + Produce the metrics + """ + self.collectmetrics(checkobjs=checkobjs) + self.mc.classify_metrics(self.metrics) + (m, rc) = self.mc.worst_metric(self.checks) + self.metrics["result"] = self.return_code() + output.send_summary_stats(self.metrics, debug) + output.send_peer_counts(self.metrics, debug) + + def collectmetrics(self, checkobjs: dict, debug: bool = False) -> None: """ Get metrics from each registered metric source and add all relevant aliases. """ @@ -192,112 +138,6 @@ def custom_message_sync(self, result): return "%s: Time is in sync with %s" % (result, self.objs["peers"].syncpeer()) return None - def alert(self, checkobjs, hostname, interval, format, telegraf_file, debug=False): - """ - Produce the metrics - """ - self.collectmetrics(checkobjs=checkobjs, debug=False) - self.mc.classify_metrics(self.metrics) - (m, rc) = self.mc.worst_metric(self.checks) - self.metrics["result"] = self.return_code() - if format == "collectd": - self.alert_collectd(hostname, interval) - elif format == "prometheus": - self.alert_prometheus(debug=debug) - elif format == "telegraf": - self.alert_telegraf(telegraf_file) - self.alert_peers(hostname, interval, format, telegraf_file, debug) - - def alert_collectd(self, hostname, interval): - """ - Produce collectd output for the metrics - """ - for metric in sorted(_collectdtypes.keys()): - if metric in self.metrics: - print( - 'PUTVAL "%s/ntpmon-%s" interval=%d N:%.9f' - % ( - hostname, - _collectdtypes[metric], - interval, - self.metrics[metric], - ) - ) - - def set_prometheus_metric(self, name, description, value, peertype=None): - import prometheus_client - - if name in self.prometheus_objs: - g = self.prometheus_objs[name] - if peertype is not None: - g = g.labels(peertype=peertype) - else: - if peertype is not None: - g = prometheus_client.Gauge(name, description, ["peertype"]) - self.prometheus_objs[name] = g - g = g.labels(peertype=peertype) - else: - g = prometheus_client.Gauge(name, description) - self.prometheus_objs[name] = g - g.set(value) - - def alert_prometheus(self, debug=False): - def emit_metric(name, description, metrictype, value, format): - if debug: - valuestr = format % (value,) - print("# HELP %s %s" % (name, description)) - print("# TYPE %s gauge" % (name,)) - print("%s %s" % (name, valuestr)) - else: - self.set_prometheus_metric(name, description, value) - - for metric in sorted(_prometheus_types.keys()): - if metric in self.metrics: - (metrictype, suffix, description) = _prometheus_types[metric] - s = "ntpmon_" + metric - if suffix is not None: - s += suffix - val = self.metrics[metric] - fmt = "%.9f" - if metrictype == "i": - fmt = "%d" - elif metrictype == "%": - val /= 100 - emit_metric(s, description, metrictype, val, fmt) - - def alert_telegraf(self, telegraf_file: TextIOWrapper): - telegraf_metrics = {k: self.metrics[k] for k in sorted(_telegraf_types.keys()) if k in self.metrics} - output = line_protocol.to_line_protocol(telegraf_metrics, "ntpmon") - print(output, file=telegraf_file) - - def alert_peers(self, hostname, interval, format, telegraf_file, debug=False): - if debug and format == "prometheus": - print("# TYPE ntpmon_peers gauge") - for metric in _peer_types: - value = self.metrics.get(metric) - if format == "collectd": - print( - 'PUTVAL "%s/ntpmon-%s" interval=%d N:%.9f' - % ( - hostname, - _peer_types[metric], - interval, - value, - ) - ) - elif format == "prometheus": - if debug: - print('ntpmon_peers{peertype="%s"} %d' % (metric, value)) - else: - self.set_prometheus_metric("ntpmon_peers", "NTP peer count", value, metric) - elif format == "telegraf": - telegraf_metrics = { - "count": value, - "peertype": metric, - } - output = line_protocol.to_line_protocol(telegraf_metrics, "ntpmon_peers") - print(output, file=telegraf_file) - def alert_nagios(self, checkobjs, debug): """ Produce nagios output for the metrics diff --git a/src/line_protocol.py b/src/line_protocol.py index 01b81fe..15e2a59 100755 --- a/src/line_protocol.py +++ b/src/line_protocol.py @@ -61,7 +61,10 @@ def to_line_protocol(metrics: dict, which: str, additional_tags: dict = {}) -> s timestamp = f" {seconds}{nanoseconds:09}" else: timestamp = "" - return f"{which},{format_tags(metrics, additional_tags)} {format_fields(metrics)}{timestamp}" + tags = format_tags(metrics, additional_tags) + if len(tags): + tags = "," + tags + return f"{which}{tags} {format_fields(metrics)}{timestamp}" def transform_identifier(id: str) -> str: diff --git a/src/ntpmon.py b/src/ntpmon.py index d0b4e0b..747f08b 100755 --- a/src/ntpmon.py +++ b/src/ntpmon.py @@ -10,20 +10,15 @@ import sys import time -from io import TextIOWrapper - import alert -import line_protocol +import outputs import peer_stats import process from tailer import Tailer -debug = sys.stdout.isatty() - - -def get_args(): +def get_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="NTPmon - NTP metrics monitor") parser.add_argument( "--mode", @@ -41,6 +36,17 @@ def get_args(): help="Connect string (in host:port format) to use when sending data to telegraf (default: 127.0.0.1:8094)", default="127.0.0.1:8094", ) + parser.add_argument( + "--debug", + action=argparse.BooleanOptionalAction, + help="Run in debug mode (default: True if standard output is a tty device)", + default=sys.stdout.isatty(), + ) + parser.add_argument( + "--hostname", + type=str, + help="The hostname to use for sending collectd metrics", + ) parser.add_argument( "--interval", type=int, @@ -65,46 +71,34 @@ def get_args(): default=9648, ) args = parser.parse_args() - return args + if "COLLECTD_INTERVAL" in os.environ: + if args.interval is None: + args.interval = float(os.environ["COLLECTD_INTERVAL"]) + if args.mode is None: + args.mode = "collectd" -def get_telegraf_file(connect: str) -> TextIOWrapper: - """Return a TextIOWrapper for writing data to telegraf""" - (host, port) = connect.split(":") - port = int(port) - s = socket.socket() - s.connect((host, port)) - return s.makefile(mode="w") + if "COLLECTD_HOSTNAME" in os.environ: + if args.hostname is None: + args.hostname = os.environ["COLLECTD_HOSTNAME"] + if args.mode is None: + args.mode = "collectd" + if args.hostname is None: + args.hostname = socket.getfqdn() -def get_time_until(interval): - now = time.time() - return interval - now % interval + if args.interval is None: + args.interval = 60 + return args -checkobjs = None +def get_time_until(interval: int) -> float: + now = time.time() + return interval - now % interval -async def alert_task(args: argparse.Namespace, hostname: str, telegraf: TextIOWrapper): - global checkobjs - checks = ["proc", "offset", "peers", "reach", "sync", "vars"] - alerter = alert.NTPAlerter(checks) - while True: - implementation = process.get_implementation() - if implementation: - # run the checks, returning their data - checkobjs = process.ntpchecks(checks, debug=False, implementation=implementation) - # alert on the data collected - alerter.alert( - checkobjs=checkobjs, - hostname=hostname, - interval=args.interval, - format=args.mode, - telegraf_file=telegraf, - debug=debug, - ) - await asyncio.sleep(get_time_until(args.interval)) +checkobjs = None def find_type(source: str, peerobjs: dict) -> str: @@ -120,11 +114,9 @@ def find_type(source: str, peerobjs: dict) -> str: return "UNKNOWN" -async def peer_stats_task(args: argparse.Namespace, telegraf: TextIOWrapper) -> None: - if args.mode != "telegraf": - # FIXME: add prometheus & collectd implementation - return - +async def peer_stats_task(args: argparse.Namespace, output: outputs.Output) -> None: + """Tail the peer stats log file and send the measurements to the selected output""" + global checkobjs implementation = None logfile = args.logfile tailer = None @@ -156,45 +148,30 @@ async def peer_stats_task(args: argparse.Namespace, telegraf: TextIOWrapper) -> if stats is not None: if "type" not in stats: stats["type"] = find_type(stats["source"], checkobjs["peers"].peers) - telegraf_line = line_protocol.to_line_protocol(stats, "ntpmon_peer") - print(telegraf_line, file=telegraf) - + output.send_measurement(stats, debug=args.debug) -async def start_tasks(args: argparse.Namespace, hostname: str, telegraf: TextIOWrapper) -> None: - alert = asyncio.create_task(alert_task(args, hostname, telegraf), name="alert") - stats = asyncio.create_task(peer_stats_task(args, telegraf), name="stats") - await asyncio.wait((alert, stats), return_when=asyncio.ALL_COMPLETED) - - -def main(): - args = get_args() - - if "COLLECTD_HOSTNAME" in os.environ and args.mode is None: - args.mode = "collectd" - hostname = os.environ["COLLECTD_HOSTNAME"] - else: - hostname = socket.getfqdn() - - if "COLLECTD_INTERVAL" in os.environ and args.mode is None: - args.mode = "collectd" - if args.interval is None: - args.interval = float(os.environ["COLLECTD_INTERVAL"]) - if args.interval is None: - args.interval = 60 +async def summary_stats_task(args: argparse.Namespace, output: outputs.Output) -> None: + global checkobjs + checks = ["proc", "offset", "peers", "reach", "sync", "vars"] + alerter = alert.NTPAlerter(checks) + while True: + implementation = process.get_implementation() + if implementation: + # run the checks, returning their data + checkobjs = process.ntpchecks(checks, debug=False, implementation=implementation) + # alert on the data collected + alerter.alert(checkobjs=checkobjs, output=output, debug=args.debug) - if not debug: - if args.mode == "telegraf": - telegraf_file = get_telegraf_file(args.connect) - elif args.mode == "prometheus": - import prometheus_client + await asyncio.sleep(get_time_until(args.interval)) - prometheus_client.start_http_server(addr=args.listen_address, port=args.port) - else: - telegraf_file = sys.stdout - asyncio.run(start_tasks(args, hostname, telegraf_file)) +async def start_tasks(args: argparse.Namespace) -> None: + output = outputs.get_output(args) + peer_stats = asyncio.create_task(peer_stats_task(args, output), name="peerstats") + summary_stats = asyncio.create_task(summary_stats_task(args, output), name="summarystats") + await asyncio.wait((peer_stats, summary_stats), return_when=asyncio.FIRST_COMPLETED) if __name__ == "__main__": - main() + asyncio.run(start_tasks(get_args())) diff --git a/src/outputs.py b/src/outputs.py new file mode 100644 index 0000000..49ed055 --- /dev/null +++ b/src/outputs.py @@ -0,0 +1,289 @@ +# +# Copyright: (c) 2023 Paul D. Gear +# License: AGPLv3 + + +import argparse +import socket +import sys + +from io import TextIOWrapper +from typing import ClassVar + + +import line_protocol + + +class Output: + + peertypes: ClassVar[dict[str, str]] = { + "backup": "peers/count-backup", + "excess": "peers/count-excess", + "false": "peers/count-false", + "invalid": "peers/count-invalid", + "outlier": "peers/count-outlier", + "pps": "peers/count-pps", + "survivor": "peers/count-survivor", + "sync": "peers/count-sync", + } + + peerstatstypes: ClassVar[dict[str, str]] = { + "authenticated": "authenticated/bool", + "authentication_enabled": "authentication-enabled/bool", + "authentication_fail": "authentication-fail/bool", + "bad_header": "bad-header/bool", + "bogus": "bogus/bool", + "broadcast": "broadcast/bool", + "delay": "delay/time_offset", + "dispersion": "dispersion/time_dispersion", + "duplicate": "duplicate/bool", + "exceeded_max_delay": "exceeded-max-delay/bool", + "exceeded_max_delay_dev_ratio": "exceeded-max-delay-dev-ratio/bool", + "exceeded_max_delay_ratio": "exceeded-max-delay-ratio/bool", + "frequency": "frequency/frequency_offset", + "interleaved": "interleaved/bool", + "invalid": "invalid/bool", + "jitter": "jitter/time_offset", + "leap": "leap/bool", + "local_poll": "local-poll/gauge", + "offset": "offset/time_offset", + "persistent": "persistent/bool", + "reachable": "reachable/bool", + "remote_poll": "remote-poll/gauge", + "root_delay": "rootdelay/root_delay", + "root_dispersion": "rootdisp/root_dispersion", + "score": "score/gauge", + "stratum": "stratum/clock_stratum", + "sync_loop": "sync-loop/bool", + "synchronized": "synchronized/bool", + } + + summarytypes: ClassVar[dict[str, str]] = { + "frequency": "frequency/frequency_offset", + "offset": "offset/time_offset", + "reach": "reachability/percent", + "rootdelay": "rootdelay/root_delay", + "rootdisp": "rootdisp/root_dispersion", + "runtime": "runtime/duration", + "stratum": "stratum/clock_stratum", + "sysjitter": "sysjitter/time_offset", + "sysoffset": "sysoffset/time_offset", + } + + def send_measurement(self, metrics: dict, debug: bool = False) -> None: + pass + + def send_peer_counts(self, metrics: dict, debug: bool = False) -> None: + pass + + def send_summary_stats(self, metrics: dict, debug: bool = False) -> None: + pass + + +class CollectdOutput(Output): + def __init__(self, args: argparse.Namespace) -> None: + self.args = args + + formatstr: ClassVar[str] = 'PUTVAL "%s/ntpmon-%s" interval=%d N:%.9f' + + def send_measurement(self, metrics: dict, debug: bool = False) -> None: + self.send_stats(metrics, self.peerstatstypes, hostname=metrics["source"], debug=debug) + + def send_peer_counts(self, metrics: dict, debug: bool = False) -> None: + self.send_stats(metrics, self.peertypes, debug=debug) + + def send_stats(self, metrics: dict, types: dict, debug: bool = False, hostname: str = None) -> None: + if hostname is None: + hostname = self.args.hostname + for metric in sorted(types.keys()): + if metric in metrics: + print(self.formatstr % (hostname, types[metric], self.args.interval, metrics[metric])) + + def send_summary_stats(self, metrics: dict, debug: bool = False) -> None: + self.send_stats(metrics, self.summarytypes, debug=debug) + + +class PrometheusOutput(Output): + def __init__(self, args: argparse.Namespace) -> None: + self.prometheus_objs = {} + import prometheus_client + + prometheus_client.start_http_server(addr=args.listen_address, port=args.port) + + peerstatslabels: ClassVar[list[str]] = [ + "mode", + "refid", + "rx_timestamp", + "source", + "tx_timestamp", + "type", + ] + + peerstatstypes: ClassVar[dict[str, str]] = { + "authenticated": ("i", None, "Whether the peer is authenticated"), + "authentication_enabled": ("i", None, "Whether the peer has authentication enabled"), + "authentication_fail": ("i", None, "Whether the peer has failed authentication"), + "bad_header": ("i", None, "Whether the peer has sent bad header data"), + "bogus": ("i", None, "Whether the peer has has been marked as bogus"), + "broadcast": ("i", None, "Whether the peer is a broadcast association"), + "delay": (None, "_seconds", "Network round trip delay to this peer"), + "dispersion": (None, "_seconds", "Calculated uncertainty for this peer"), + "duplicate": ("i", None, "Whether the peer's last response is a duplicate"), + "exceeded_max_delay": ("i", None, "Whether the peer has exceeded the chrony maximum delay"), + "exceeded_max_delay_dev_ratio": ("i", None, "Whether the peer has exceeded the chrony maximum delay dev ratio"), + "exceeded_max_delay_ratio": ("i", None, "Whether the peer has exceeded the chrony maximum delay ratio"), + "frequency": (None, "_hertz", "Estimated frequency error of this peer's clock"), + "interleaved": ("i", None, "Whether the peer has interleaving enabled"), + "invalid": ("i", None, "Whether the peer has failed validity checks"), + "jitter": (None, "_seconds", "RMS average of peer offset differences"), + "leap": ("i", None, "Whether the peer has asserted its leap indicator"), + "local_poll": ("i", None, "Rate at which local host polls this peer"), + "offset": (None, "_seconds", "Current clock offset of this peer"), + "persistent": ("i", None, "Whether the peer is configured as a persistent association"), + "reachable": ("i", None, "Whether the peer is reachable"), + "remote_poll": ("i", None, "The poll rate reported by this peer"), + "root_delay": (None, "_seconds", "The root delay reported by this peer"), + "root_dispersion": (None, "_seconds", "The root dispersion reported by this peer"), + "score": (None, None, "The chrony score calculated for this peer"), + "stratum": ("i", None, "The stratum reported by this peer"), + "sync_loop": ("i", None, "Whether a synchronization loop has been detected for this peer"), + "synchronized": ("i", None, "Whether the peer reports as synchronized"), + } + + summarystatstypes: ClassVar[dict[str, tuple[str, str, str]]] = { + "frequency": (None, "_hertz", "Frequency error of the local clock"), + "offset": (None, "_seconds", "Mean clock offset of peers"), + "reach": ("%", "_ratio", "Peer reachability over the last 8 polls"), + "rootdelay": (None, "_seconds", "Network delay to stratum 0 sources"), + "rootdisp": (None, "_seconds", "Maximum calculated uncertainty from stratum 0 sources"), + "runtime": (None, "_duration_seconds", "Duration NTP service has been running"), + "stratum": ("i", None, "NTP stratum of this server"), + "sysjitter": (None, "_seconds", "RMS average of most recent system peer offset differences"), + "sysoffset": (None, "_seconds", "Current clock offset of selected system peer"), + } + + def send_measurement(self, metrics: dict, debug: bool = False) -> None: + self.send_stats( + "ntpmon_peer", + metrics, + self.peerstatstypes, + [x for x in self.peerstatslabels if x in metrics], + [metrics[x] for x in self.peerstatslabels if x in metrics], + debug=debug, + ) + + def send_peer_counts(self, metrics: dict, debug: bool = False) -> None: + for metric in sorted(self.peertypes.keys()): + if metric in metrics: + self.set_prometheus_metric( + "ntpmon_peers", + "NTP peer count", + metrics[metric], + "%d", + labelnames=["peertype"], + labels=[metric], + debug=debug, + ) + + def send_summary_stats(self, metrics: dict, debug: bool = False) -> None: + self.send_stats("ntpmon", metrics, self.summarystatstypes, debug=debug) + + def send_stats( + self, + prefix: str, + metrics: dict, + metrictypes: dict, + labelnames: list[str] = [], + labels: list[str] = [], + debug: bool = False, + ) -> None: + for metric in sorted(metrictypes.keys()): + if metric in metrics: + (datatype, suffix, description) = metrictypes[metric] + name = prefix + "_" + line_protocol.transform_identifier(metric) + if suffix is not None: + name += suffix + value = metrics[metric] + fmt = "%.9f" + if datatype == "i": + fmt = "%d" + elif datatype == "%": + value /= 100 + self.set_prometheus_metric(name, description, value, fmt, labelnames, labels, debug=debug) + + def set_prometheus_metric( + self, + name: str, + description: str, + value: float, + fmt: str, + labelnames: list[str], + labels: list[str], + debug: bool = False, + ) -> None: + import prometheus_client + + if debug: + print("# HELP %s %s" % (name, description)) + print("# TYPE %s gauge" % (name,)) + labelstr = ",".join([k + "=\"" + v + "\"" for k, v in zip(labelnames, labels)]) + if len(labelstr): + labelstr = "{" + labelstr + "}" + valuestr = fmt % (value,) + print("%s%s %s" % (name, labelstr, valuestr)) + return + + if name not in self.prometheus_objs: + g = prometheus_client.Gauge(name, description, labelnames) + self.prometheus_objs[name] = g + else: + g = self.prometheus_objs[name] + + if len(labels): + g = g.labels(*labels) + + g.set(value) + + +class TelegrafOutput(Output): + def __init__(self, args: argparse.Namespace) -> None: + super().__init__() + self.file = sys.stdout if args.debug else self.get_telegraf_file(args.connect) + + @classmethod + def get_telegraf_file(connect: str) -> TextIOWrapper: + """Return a TextIOWrapper for writing data to telegraf""" + (host, port) = connect.split(":") + port = int(port) + s = socket.socket() + s.connect((host, port)) + return s.makefile(mode="w") + + def send_measurement(self, metrics: dict, debug: bool = False) -> None: + telegraf_line = line_protocol.to_line_protocol(metrics, "ntpmon_peer") + print(telegraf_line, file=self.file) + + def send_peer_counts(self, metrics: dict, debug: bool = False) -> None: + for metric in sorted(self.peertypes.keys()): + telegraf_metrics = { + "count": metrics[metric], + "peertype": metric, + } + output = line_protocol.to_line_protocol(telegraf_metrics, "ntpmon_peers") + print(output, file=self.file) + + def send_summary_stats(self, metrics: dict, debug: bool = False) -> None: + telegraf_metrics = {k: metrics[k] for k in sorted(self.summarytypes.keys()) if k in metrics} + telegraf_line = line_protocol.to_line_protocol(telegraf_metrics, "ntpmon") + print(telegraf_line, file=self.file) + + +def get_output(args: argparse.Namespace) -> Output: + if args.mode == "collectd": + return CollectdOutput(args) + elif args.mode == "prometheus": + return PrometheusOutput(args) + elif args.mode == "telegraf": + return TelegrafOutput(args) + else: + raise ValueError("Unknown output mode") diff --git a/src/peer_stats.py b/src/peer_stats.py index eef4071..a5011c3 100755 --- a/src/peer_stats.py +++ b/src/peer_stats.py @@ -74,26 +74,26 @@ def extract_chrony_measurements(f: list[str]) -> dict: "duplicate": checkfail(f[5][0]), "bogus": checkfail(f[5][1]), "invalid": checkfail(f[5][2]), - "authentication-fail": checkfail(f[6][0]), + "authentication_fail": checkfail(f[6][0]), "synchronized": int(f[6][1]), - "bad-header": checkfail(f[6][2]), - "exceeded-max-delay": checkfail(f[7][0]), - "exceeded-max-delay-ratio": checkfail(f[7][1]), - "exceeded-max-delay-dev-ratio": checkfail(f[7][2]), - "sync-loop": checkfail(f[7][3]), - "local-poll": int(f[8]), - "remote-poll": int(f[9]), + "bad_header": checkfail(f[6][2]), + "exceeded_max_delay": checkfail(f[7][0]), + "exceeded_max_delay_ratio": checkfail(f[7][1]), + "exceeded_max_delay_dev_ratio": checkfail(f[7][2]), + "sync_loop": checkfail(f[7][3]), + "local_poll": int(f[8]), + "remote_poll": int(f[9]), "score": float(f[10]), "offset": float(f[11]), "delay": float(f[12]), "dispersion": float(f[13]), - "root-delay": float(f[14]), - "root-dispersion": float(f[15]), + "root_delay": float(f[14]), + "root_dispersion": float(f[15]), "refid": f[16], "mode": modes.get(f[17][0], "UNKNOWN"), "interleaved": 1 if f[17][1] == "I" else 0, - "tx-timestamp": timestamp_sources.get(f[18], "UNKNOWN"), - "rx-timestamp": timestamp_sources.get(f[19], "UNKNOWN"), + "tx_timestamp": timestamp_sources.get(f[18], "UNKNOWN"), + "rx_timestamp": timestamp_sources.get(f[19], "UNKNOWN"), } @@ -125,12 +125,12 @@ def extract_chrony_statistics(f: list[str]) -> dict: "source": f[2], "stdev": float(f[3]), "offset": float(f[4]), - "stdev-est": float(f[5]), + "stdev_est": float(f[5]), "skew": float(f[6]), "freq": float(f[7]), "stress": float(f[8]), "samples": int(f[9]), - "begin-sample": int(f[10]), + "begin_sample": int(f[10]), "runs": int(f[11]), "asymmetry": float(f[12]), } @@ -165,12 +165,12 @@ def extract_chrony_tracking(f: list[str]) -> dict: "skew": float(f[5]), "offset": float(f[6]), "leap": leapcodes.get(f[7], -1), - "num-combined": int(f[8]), + "num_combined": int(f[8]), "stdev": float(f[9]), - "remaining-correction": float(f[10]), - "root-delay": float(f[11]), - "root-dispersion": float(f[12]), - "max-error": float(f[13]), + "remaining_correction": float(f[10]), + "root_delay": float(f[11]), + "root_dispersion": float(f[12]), + "max_error": float(f[13]), } @@ -224,7 +224,7 @@ def extract_ntpd_status_word(status: str) -> dict: "broadcast": bool(status_word & 0x08), "reachable": bool(status_word & 0x10), "authenticated": bool(status_word & 0x20), - "authentication-enabled": bool(status_word & 0x40), + "authentication_enabled": bool(status_word & 0x40), "persistent": bool(status_word & 0x80), "type": select_field[status_word & 0x07], } diff --git a/unit_tests/test_peer_stats.py b/unit_tests/test_peer_stats.py index 2fec412..d7065d2 100644 --- a/unit_tests/test_peer_stats.py +++ b/unit_tests/test_peer_stats.py @@ -39,7 +39,7 @@ def test_parse_chrony_measurements() -> None: assert measurements[3]["offset"] > measurements[4]["offset"] assert measurements[4]["datetime"] == datetime.datetime(2021, 12, 30, 11, 28, 49, tzinfo=datetime.timezone.utc) assert measurements[5]["score"] == 0.01 - assert bool(measurements[5]["exceeded-max-delay-dev-ratio"]) + assert bool(measurements[5]["exceeded_max_delay_dev_ratio"]) peerstats = """ From 4fdcdfc4065e8c4c03de19429338466126c4cacd Mon Sep 17 00:00:00 2001 From: Paul Gear Date: Wed, 27 Dec 2023 18:01:31 +1000 Subject: [PATCH 3/3] Reformat --- src/outputs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/outputs.py b/src/outputs.py index 49ed055..fcc329a 100644 --- a/src/outputs.py +++ b/src/outputs.py @@ -226,7 +226,7 @@ def set_prometheus_metric( if debug: print("# HELP %s %s" % (name, description)) print("# TYPE %s gauge" % (name,)) - labelstr = ",".join([k + "=\"" + v + "\"" for k, v in zip(labelnames, labels)]) + labelstr = ",".join([k + '="' + v + '"' for k, v in zip(labelnames, labels)]) if len(labelstr): labelstr = "{" + labelstr + "}" valuestr = fmt % (value,)