forked from info-beamer/36c3-cms
-
Notifications
You must be signed in to change notification settings - Fork 2
/
notifier.py
83 lines (64 loc) · 2.48 KB
/
notifier.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
from json import dumps
from logging import getLogger
import paho.mqtt.client as mqtt
from flask import url_for
from requests import post
from conf import CONFIG
from util import cached_asset_name
LOG = getLogger("Notifier")
class Notifier:
def __init__(self):
self.config = CONFIG["NOTIFIER"]
LOG.debug(f"init {self.config=}")
self.mqtt = None
if self.config.get("MQTT_HOST"):
self.mqtt = mqtt.Client()
if self.config.get("MQTT_USERNAME") and self.config.get("MQTT_PASSWORD"):
self.mqtt.username_pw_set(
self.config["MQTT_USERNAME"], self.config["MQTT_PASSWORD"]
)
def message(self, message, level="INFO", component=None, asset=None):
LOG.debug(f"{message=} {level=} {component=}")
if self.mqtt:
try:
self._mqtt_message(message, level, component)
except Exception:
LOG.exception("could not send mqtt message")
for ntfy_url in self.config.get("NTFY", set()):
try:
self._ntfy_message(ntfy_url, message, asset)
except Exception:
LOG.exception(f"ntfy url {ntfy_url} failed sending")
def _mqtt_message(self, message, level, component_suffix):
assert self.mqtt is not None
LOG.info("sending mqtt message")
component = "infobeamer-cms"
if component_suffix is not None:
component = f"{component}/{component_suffix}"
payload = {
"level": level,
"component": component,
"msg": message,
}
LOG.info(f"mqtt payload is {payload!r}")
self.mqtt.connect(self.config["MQTT_HOST"])
self.mqtt.publish(self.config["MQTT_TOPIC"], dumps(payload))
self.mqtt.disconnect()
LOG.info("sent mqtt message")
def _ntfy_message(self, ntfy_url, message, asset):
LOG.info(f"sending alert to {ntfy_url} with message {message!r}")
headers = {}
if asset is not None:
headers["Click"] = url_for(
"content_moderate", asset_id=asset.id, _external=True
)
headers["Attach"] = url_for(
"static", filename=cached_asset_name(asset), _external=True
)
r = post(
ntfy_url,
data=str(message).encode("utf-8"),
headers=headers,
)
r.raise_for_status()
LOG.info(f"ntfy url {ntfy_url} returned {r.status_code}")