-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathquery_and_submit.py
132 lines (108 loc) · 3.43 KB
/
query_and_submit.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
import datetime as dt
import json
import os
from typing import Any, List
import requests
import subprocess
import yaml
from dataclasses import dataclass
@dataclass
class Config:
interval_start: str = ""
interval_end: str = ""
interval_end_excl: str = ""
metrics_token: str = os.environ.get("METRICS_WEBHOOK_TOKEN")
def __post_init__(self):
now = dt.datetime.now()
one_hour_ago = now - dt.timedelta(hours=1)
self.interval_start = one_hour_ago.strftime("%Y-%m-%dT%H:00:00Z")
self.interval_end = one_hour_ago.strftime("%Y-%m-%dT%H:59:59Z")
self.interval_end_excl = now.strftime("%Y-%m-%dT%H:00:00Z")
def load_endpoints(filename):
with open(filename) as f:
return yaml.safe_load(f)
def process_endpoints(endpoints, config):
for endpoint in endpoints:
process_endpoint(endpoint, config)
def process_endpoint(endpoint, config):
query_function = QUERY_FUNCTIONS_BY_TYPE[endpoint["type"]]
data = query_function(endpoint, config)
post_data(endpoint["endpoint"], config, data)
def query_metric_counts(endpoint, config):
metrics = {}
for metric_name, query in endpoint["queries"].items():
output = json.loads(
subprocess.check_output(
[
"/logcli-linux-amd64",
"instant-query",
"--now",
config.interval_end,
f"sum(count_over_time({query} [1h]))",
]
)
)
metrics[metric_name] = output[0]["value"][1] if output else 0
return {
"startInterval": config.interval_start,
"endInterval": config.interval_end,
**metrics,
}
def query_rows_from_labels(endpoint, config):
query = endpoint["query"]
intervals = {
"startInterval": config.interval_start,
"endInterval": config.interval_end,
}
label_mapping = endpoint["labelMapping"]
timestamp_mapping = endpoint.get("timestampMapping")
logcli_output = subprocess.check_output(
[
"/logcli-linux-amd64",
"query",
"--quiet",
"--output=jsonl",
"--limit=500000",
"--from",
config.interval_start,
"--to",
config.interval_end_excl,
*[f"--include-label={label}" for label in label_mapping],
f'{query} | line_format ""',
]
)
data = []
for line in logcli_output.decode("utf-8").split("\n"):
if not line:
continue
row = json.loads(line)
timestamp = {timestamp_mapping: row["timestamp"]} if timestamp_mapping else {}
data.append(
{
**{
label_mapping[label]: value
for label, value in row["labels"].items()
if label in label_mapping
},
**timestamp,
**intervals,
}
)
return data
QUERY_FUNCTIONS_BY_TYPE = {
"metricCounts": query_metric_counts,
"rowsFromLabels": query_rows_from_labels,
}
def post_data(endpoint, config, data):
response = requests.post(
url=endpoint,
json=data,
headers={
"xc-token": config.metrics_token,
},
)
response.raise_for_status()
if __name__ == "__main__":
config = Config()
endpoints = load_endpoints("/opt/config.yaml")
process_endpoints(endpoints, config)