diff --git a/AUTHORS b/AUTHORS index b6d378e..d514346 100644 --- a/AUTHORS +++ b/AUTHORS @@ -1 +1,4 @@ +Flavio Fernandes Flavio Fernandes +Norman Rasmussen +Your Name diff --git a/ChangeLog b/ChangeLog index 59a0fff..73316b6 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,4 +1,24 @@ CHANGES ======= +* aiomqtt: peg requirement to 1.x +* aiomqtt: peg requirement to 1.2.1 or older +* data/config.yaml.flaviof + +v0.0.2 +------ + +* emeter: publish key and values as topics of emeter (#12) +* logging: use sdtout as last resort (#11) +* Add support for publishing emeter information + +v0.0.1 +------ + +* config: add support for qos and retain in mqtt +* add: kitchen-clock +* Sync up with https://github.com/clmcavaney/mqtt2kasa +* Make it python 3.7 compatible +* Add toggle support and better tests +* Trivial: New device to flaviof config and strictyaml to TODO * first commit diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..217fdf6 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,18 @@ +FROM python:3.13-rc-bookworm + +COPY . /src + +RUN apt-get update && \ + apt-get install -y git curl build-essential gcc make + +RUN curl https://sh.rustup.rs -sSf | bash -s -- -y + +ENV PATH="/root/.cargo/bin:${PATH}" + +RUN python3 -m pip install cffi +RUN python3 -m pip install --no-cache-dir git+https://github.com/sbtinstruments/aiomqtt +RUN python3 -m pip install --no-cache-dir -r /src/requirements.txt +WORKDIR /src +RUN python3 setup.py install + +ENTRYPOINT ["python3", "/src/mqtt2kasa/main.py", "/src/data/config.yaml"] \ No newline at end of file diff --git a/build/lib/mqtt2kasa/__init__.py b/build/lib/mqtt2kasa/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/build/lib/mqtt2kasa/bin/create-env.sh b/build/lib/mqtt2kasa/bin/create-env.sh new file mode 100644 index 0000000..5868ca9 --- /dev/null +++ b/build/lib/mqtt2kasa/bin/create-env.sh @@ -0,0 +1,21 @@ +#!/bin/bash +set -o errexit +set -o xtrace + +cd "$(dirname $0)" +BIN_DIR="${PWD}" +PROG_DIR="${BIN_DIR%/*}" +TOP_DIR="${PROG_DIR%/*}" + +pushd ${TOP_DIR} +if [ ! -e ./env ]; then + #virtualenv --system-site-packages env + python3 -m venv --copies env +fi +source ./env/bin/activate +#pip install --upgrade pip setuptools +pip install --ignore-installed -r ./requirements.txt +deactivate + +popd +exit 0 diff --git a/build/lib/mqtt2kasa/bin/mqtt2kasa.service.rpi b/build/lib/mqtt2kasa/bin/mqtt2kasa.service.rpi new file mode 100644 index 0000000..bb40298 --- /dev/null +++ b/build/lib/mqtt2kasa/bin/mqtt2kasa.service.rpi @@ -0,0 +1,11 @@ +[Unit] +Description=MQTT front end wrapper to python-kasa + +[Service] +User=pi +Type=simple +ExecStart=/home/pi/mqtt2kasa.git/mqtt2kasa/bin/start_mqtt2kasa.sh +Restart=on-failure + +[Install] +WantedBy=multi-user.target diff --git a/build/lib/mqtt2kasa/bin/mqtt2kasa.service.vagrant b/build/lib/mqtt2kasa/bin/mqtt2kasa.service.vagrant new file mode 100644 index 0000000..60649ae --- /dev/null +++ b/build/lib/mqtt2kasa/bin/mqtt2kasa.service.vagrant @@ -0,0 +1,11 @@ +[Unit] +Description=MQTT front end wrapper to python-kasa + +[Service] +User=vagrant +Type=simple +ExecStart=/vagrant/mqtt2kasa/bin/start_mqtt2kasa.sh /home/vagrant/mqtt2kasa.config.yaml +Restart=on-failure + +[Install] +WantedBy=multi-user.target diff --git a/build/lib/mqtt2kasa/bin/reload_config.sh b/build/lib/mqtt2kasa/bin/reload_config.sh new file mode 100644 index 0000000..e303351 --- /dev/null +++ b/build/lib/mqtt2kasa/bin/reload_config.sh @@ -0,0 +1,2 @@ +#!/bin/bash +sudo systemctl restart mqtt2kasa diff --git a/build/lib/mqtt2kasa/bin/start_mqtt2kasa.sh b/build/lib/mqtt2kasa/bin/start_mqtt2kasa.sh new file mode 100644 index 0000000..923c0a2 --- /dev/null +++ b/build/lib/mqtt2kasa/bin/start_mqtt2kasa.sh @@ -0,0 +1,16 @@ +#!/bin/bash + +set -o errexit +#set -x + +cd "$(dirname $0)" +BIN_DIR="${PWD}" +PROG_DIR="${BIN_DIR%/*}" +TOP_DIR="${PROG_DIR%/*}" + +cd ${TOP_DIR}/env +source ./bin/activate +export PYTHONPATH=${PYTHONPATH:-$TOP_DIR} +cd ${PROG_DIR} && ./main.py $@ + +exit 0 diff --git a/build/lib/mqtt2kasa/bin/tail_log.sh b/build/lib/mqtt2kasa/bin/tail_log.sh new file mode 100644 index 0000000..beca362 --- /dev/null +++ b/build/lib/mqtt2kasa/bin/tail_log.sh @@ -0,0 +1,7 @@ +#!/bin/bash + +if [ -z "$1" ]; then + sudo journalctl -u mqtt2kasa.service --no-pager --follow +else + sudo tail -F /var/log/syslog | grep mqtt2kasa +fi diff --git a/build/lib/mqtt2kasa/config.py b/build/lib/mqtt2kasa/config.py new file mode 100644 index 0000000..28aff8b --- /dev/null +++ b/build/lib/mqtt2kasa/config.py @@ -0,0 +1,174 @@ +#!/usr/bin/env python +import collections +import os +import sys +from collections import namedtuple + +import yaml + +from mqtt2kasa import const +from mqtt2kasa import log + +CFG_FILENAME = os.path.dirname(os.path.abspath(const.__file__)) + "/../data/config.yaml" +Info = namedtuple("Info", "mqtt knobs cfg_globals locations keep_alives raw_cfg") + + +class Cfg: + _info = None # class (or static) variable + + def __init__(self): + pass + + @property + def mqtt_host(self): + attr = self._get_info().mqtt + if isinstance(attr, collections.abc.Mapping): + return attr.get("host", const.MQTT_DEFAULT_BROKER_IP) + return const.MQTT_DEFAULT_BROKER_IP + + @property + def mqtt_client_id(self): + attr = self._get_info().mqtt + if isinstance(attr, collections.abc.Mapping): + return attr.get("client_id", const.MQTT_DEFAULT_CLIENT_ID) + return const.MQTT_DEFAULT_CLIENT_ID + + @property + def mqtt_username(self): + attr = self._get_info().mqtt + if isinstance(attr, collections.abc.Mapping): + return attr.get("username", None) + return None + + @property + def mqtt_password(self): + attr = self._get_info().mqtt + if isinstance(attr, collections.abc.Mapping): + return attr.get("password", None) + return None + + @property + def mqtt_retain(self): + attr = self._get_info().mqtt + if isinstance(attr, collections.abc.Mapping): + return attr.get("retain", False) + return False + + @property + def mqtt_qos(self): + attr = self._get_info().mqtt + if isinstance(attr, collections.abc.Mapping): + return attr.get("qos", 0) + return 0 + + @property + def reconnect_interval(self): + attr = self._get_info().mqtt + if isinstance(attr, collections.abc.Mapping): + return attr.get("reconnect_interval", const.MQTT_DEFAULT_RECONNECT_INTERVAL) + return const.MQTT_DEFAULT_RECONNECT_INTERVAL + + @property + def knobs(self): + return self._get_info().knobs + + def mqtt_topic(self, location_name): + locations = self._get_info().locations + if isinstance(locations, collections.abc.Mapping): + location_attributes = locations.get(location_name, {}) + if location_attributes.get("topic"): + return location_attributes["topic"].format(location_name) + cfg_globals = self._get_info().cfg_globals + topic_format = cfg_globals.get("topic_format") + return ( + topic_format.format(location_name) + if topic_format + else const.MQTT_DEFAULT_CLIENT_TOPIC_FORMAT.format(location_name) + ) + + @property + def keep_alive_task_interval(self): + cfg_globals = self._get_info().cfg_globals + return float( + cfg_globals.get("keep_alive_task_interval") + or const.KEEP_ALIVE_DEFAULT_TASK_INTERVAL + ) + + def poll_interval(self, location_name): + locations = self._get_info().locations + if isinstance(locations, collections.abc.Mapping): + location_attributes = locations.get(location_name, {}) + if location_attributes.get("poll_interval"): + return float(location_attributes["poll_interval"]) + cfg_globals = self._get_info().cfg_globals + return float( + cfg_globals.get("poll_interval") or const.KASA_DEFAULT_POLL_INTERVAL + ) + + def emeter_poll_interval(self, location_name): + locations = self._get_info().locations + if isinstance(locations, collections.abc.Mapping): + location_attributes = locations.get(location_name, {}) + if location_attributes.get("emeter_poll_interval"): + return float(location_attributes["emeter_poll_interval"]) + cfg_globals = self._get_info().cfg_globals + return float( + cfg_globals.get("emeter_poll_interval") + or const.KASA_DEFAULT_EMETER_POLL_INTERVAL + ) + + @property + def locations(self): + return self._get_info().locations + + @property + def keep_alives(self): + return self._get_info().keep_alives + + @classmethod + def _get_config_filename(cls): + if len(sys.argv) > 1: + return sys.argv[1] + return CFG_FILENAME + + @classmethod + def _get_info(cls): + if not cls._info: + config_filename = cls._get_config_filename() + logger.info("loading yaml config file %s", config_filename) + with open(config_filename, "r") as ymlfile: + raw_cfg = yaml.safe_load(ymlfile) + cls._parse_raw_cfg(raw_cfg) + return cls._info + + @classmethod + def _parse_raw_cfg(cls, raw_cfg): + cfg_globals = raw_cfg.get("globals", {}) + assert isinstance(cfg_globals, dict) + locations = raw_cfg.get("locations") + assert isinstance(locations, dict) + keep_alives = raw_cfg.get("keep_alives", {}) + assert isinstance(keep_alives, dict) + + cls._info = Info( + raw_cfg.get("mqtt"), + raw_cfg.get("knobs", {}), + cfg_globals, + locations, + keep_alives, + raw_cfg, + ) + + +# ============================================================================= + + +logger = log.getLogger() +if __name__ == "__main__": + log.initLogger() + c = Cfg() + logger.info("c.knobs: {}".format(c.knobs)) + logger.info("c.mqtt_host: {}".format(c.mqtt_host)) + logger.info("c.cfg_globals: {}".format(c.cfg_globals)) + logger.info("c.locations: {}".format(c.locations)) + logger.info("c.keep_alives: {}".format(c.keep_alives)) diff --git a/build/lib/mqtt2kasa/const.py b/build/lib/mqtt2kasa/const.py new file mode 100644 index 0000000..b1ef9de --- /dev/null +++ b/build/lib/mqtt2kasa/const.py @@ -0,0 +1,9 @@ +#!/usr/bin/env python + +MQTT_DEFAULT_CLIENT_ID = "mqtt2kasa" +MQTT_DEFAULT_CLIENT_TOPIC_FORMAT = "/kasa/device/{}" +MQTT_DEFAULT_BROKER_IP = "192.168.10.238" +MQTT_DEFAULT_RECONNECT_INTERVAL = 13 # [seconds] +KASA_DEFAULT_POLL_INTERVAL = 10 # [seconds] +KASA_DEFAULT_EMETER_POLL_INTERVAL = 0 # [seconds] 0 == disabled +KEEP_ALIVE_DEFAULT_TASK_INTERVAL = 1.5 # [seconds] diff --git a/build/lib/mqtt2kasa/events.py b/build/lib/mqtt2kasa/events.py new file mode 100644 index 0000000..2d7cf4c --- /dev/null +++ b/build/lib/mqtt2kasa/events.py @@ -0,0 +1,46 @@ +#!/usr/bin/env python +from collections import namedtuple + + +class BaseEvent: + def __init__(self, expected_attrs, attrs): + self.event = self.__class__.__name__ + self.attrs = self._dict_to_attrs(attrs) + self._check_expected_attrs(expected_attrs) + + def __getattr__(self, attr): + try: + return getattr(self.attrs, attr) + except AttributeError as e: + raise AttributeError( + f"{self.event} object is missing {attr} attribute" + ) from e + + def _check_expected_attrs(self, expected_attrs): + if expected_attrs: + for attr in expected_attrs: + getattr(self, attr) + + @staticmethod + def _dict_to_attrs(params_dict): + cls = namedtuple("Attrs", params_dict) + cls.__new__.__defaults__ = tuple(params_dict.values()) + return cls() + + +class MqttMsgEvent(BaseEvent): + def __init__(self, **attrs): + expected_attrs = "topic", "payload" + super().__init__(expected_attrs, attrs) + + +class KasaStateEvent(BaseEvent): + def __init__(self, **attrs): + expected_attrs = "name", "state" + super().__init__(expected_attrs, attrs) + + +class KasaEmeterEvent(BaseEvent): + def __init__(self, **attrs): + expected_attrs = "name", "emeter_status" + super().__init__(expected_attrs, attrs) diff --git a/build/lib/mqtt2kasa/kasa_wrapper.py b/build/lib/mqtt2kasa/kasa_wrapper.py new file mode 100644 index 0000000..ff7096e --- /dev/null +++ b/build/lib/mqtt2kasa/kasa_wrapper.py @@ -0,0 +1,249 @@ +#!/usr/bin/env python +import asyncio +import random +from typing import Optional + +from asyncio_throttle import Throttler +from kasa import Discover, EmeterStatus +from kasa.smartdevice import SmartDevice, SmartDeviceException + +from mqtt2kasa import log +from mqtt2kasa.config import Cfg +from mqtt2kasa.events import KasaStateEvent, KasaEmeterEvent + +logger = log.getLogger() + + +class Kasa: + STATE_ON = "on" + STATE_OFF = "off" + + _discovered_devices = None + + def __init__(self, name: str, topic: str, config: dict): + self.name = name + self.topic = topic + self.host = config.get("host") + self.alias = config.get("alias") + self.poll_interval = Cfg().poll_interval(name) + self.emeter_poll_interval = Cfg().emeter_poll_interval(name) + self.recv_q = asyncio.Queue(maxsize=4) + self.throttler = Throttler(rate_limit=4, period=60) + self.curr_state = None + self._device = None + assert self.host or self.alias + + async def _get_device(self) -> SmartDevice: + if not self._device: + if self.host: + self._device = await Discover.discover_single(self.host) + self.alias = self._device.alias + else: + self.host, self._device = await self._find_by_alias( + self.name, self.alias + ) + logger.info( + f"Discovered {self.host} alias:'{self._device.alias}'" + f" model:{self._device.model}" + f" mac:{self._device.mac}" + ) + return self._device + + @property + def started(self): + return self._device and isinstance(self.curr_state, bool) + + @classmethod + async def _find_by_alias(cls, name, alias, retry=0): + if not cls._discovered_devices: + cls._discovered_devices = await Discover.discover() + try: + for addr, device in cls._discovered_devices.items(): + await device.update() + if device.alias == alias: + return addr, device + except SmartDeviceException as e: + logger.warning( + f"Discovering device with alias {alias} did not go well: {e}" + ) + + if retry < 3: + cls._discovered_devices = None + return await cls._find_by_alias(name, alias, retry + 1) + raise RuntimeError(f"Unable to locate {name} from alias {alias}") + + @property + async def is_on(self) -> Optional[bool]: + try: + device = await self._get_device() + await device.update() + return device.is_on + except SmartDeviceException as e: + logger.error(f"{self.host} unable to fetch is_on: {e}") + # implicit return None + + async def turn_on(self): + async with self.throttler: + try: + device = await self._get_device() + await device.turn_on() + self.curr_state = True + except SmartDeviceException as e: + logger.error(f"{self.host} unable to turn_on: {e}") + + async def turn_off(self): + async with self.throttler: + try: + device = await self._get_device() + await device.turn_off() + self.curr_state = False + except SmartDeviceException as e: + logger.error(f"{self.host} unable to turn_off: {e}") + + @property + async def has_emeter(self) -> Optional[bool]: + try: + device = await self._get_device() + await device.update() + return device.has_emeter + except SmartDeviceException as e: + logger.error(f"{self.host} unable to get has_emeter: {e}") + # implicit return None + + @property + async def emeter_realtime(self) -> Optional[EmeterStatus]: + try: + device = await self._get_device() + await device.update() + return device.emeter_realtime + except SmartDeviceException as e: + logger.error(f"{self.host} unable to fetch emeter: {e}") + # implicit return None + + @classmethod + def state_from_name(cls, is_on: Optional[str]) -> bool: + return is_on == cls.STATE_ON + + @classmethod + def state_name(cls, is_on: Optional[bool]) -> str: + if is_on is None: + return "¯\\_(ツ)_/¯" + return cls.STATE_ON if is_on else cls.STATE_OFF + + @staticmethod + def state_is_toggle(is_toggle: str) -> bool: + return is_toggle.lower() in ("toggle", "flip", "other", "change", "reverse") + + @classmethod + def state_is_on(cls, is_on: str) -> bool: + return is_on.lower() in ( + cls.STATE_ON, + "yes", + "1", + "go", + "yeah", + "yay", + "woot", + ) + + @classmethod + def state_is_off(cls, is_off: str) -> bool: + return is_off.lower() in ( + cls.STATE_OFF, + "no", + "0", + "stop", + "boo", + "nay", + "nuke", + ) + + def state_parse(self, payload: str) -> (str, bool): + if payload in (self.STATE_ON, self.STATE_OFF): + return None, self.state_from_name(payload) + if self.state_is_toggle(payload): + new_state = False if self.curr_state else True + return self.state_name(new_state), new_state + if self.state_is_on(payload): + return self.STATE_ON, True + if self.state_is_off(payload): + return self.STATE_OFF, False + raise ValueError(f"cannot translate {payload}") + + +async def handle_kasa_poller(kasa: Kasa, main_events_q: asyncio.Queue): + fails = 0 + while True: + # chatty + # logger.debug(f"Polling {kasa.name} now. Interval is {kasa.poll_interval} seconds") + new_state = await kasa.is_on + if kasa.curr_state != new_state or fails: + if new_state is None: + fails += 1 + logger.error(f"Polling {kasa.name} ({kasa.host}) failed {fails} times") + else: + fails = 0 + await main_events_q.put( + KasaStateEvent( + name=kasa.name, state=new_state, old_state=kasa.curr_state + ) + ) + kasa.curr_state = new_state + await _sleep_with_jitter(kasa.poll_interval) + + +async def handle_kasa_emeter_poller(kasa: Kasa, main_events_q: asyncio.Queue): + fails = 0 + while True: + # chatty + # logger.debug(f"Polling {kasa.name} emeter now. Interval is {kasa.emeter_poll_interval} seconds") + if await kasa.has_emeter == False: + logger.info(f"{kasa.name} has no emeter. no emeter polling is needed") + break + + emeter_status = await kasa.emeter_realtime + if emeter_status is None: + fails += 1 + logger.error( + f"Polling {kasa.name} emeter ({kasa.host}) failed {fails} times" + ) + else: + fails = 0 + await main_events_q.put( + KasaEmeterEvent(name=kasa.name, emeter_status=str(emeter_status)) + ) + await _sleep_with_jitter(kasa.emeter_poll_interval) + + +async def _sleep_with_jitter(interval): + await asyncio.sleep(interval) + + # In order to avoid all processes sleeping and waking up at the same time, + # add a little jitter. Pick a value between 0 and 1.2 seconds + jitter = random.randint(99, 1201) + jitterSleep = float(jitter) / 1000 + await asyncio.sleep(jitterSleep) + + +async def handle_kasa_requests(kasa: Kasa): + while True: + if not kasa.started: + logger.debug(f"{kasa.name} waiting to get started by poller") + await asyncio.sleep(3) + continue + + kasa_state_event = await kasa.recv_q.get() + wanted_state = kasa_state_event.state + if wanted_state != kasa.curr_state: + logger.info( + f"{kasa.name} changing state to {kasa.state_name(wanted_state)}" + ) + if wanted_state: + await kasa.turn_on() + else: + await kasa.turn_off() + else: + logger.debug( + f"{kasa.name} state unchanged as {kasa.state_name(wanted_state)}" + ) + kasa.recv_q.task_done() diff --git a/build/lib/mqtt2kasa/keep_alive.py b/build/lib/mqtt2kasa/keep_alive.py new file mode 100644 index 0000000..5577cea --- /dev/null +++ b/build/lib/mqtt2kasa/keep_alive.py @@ -0,0 +1,129 @@ +#!/usr/bin/env python +import asyncio +from collections import namedtuple +from datetime import datetime +from typing import Dict + +from mqtt2kasa import log +from mqtt2kasa.config import Cfg +from mqtt2kasa.events import MqttMsgEvent, KasaStateEvent +from mqtt2kasa.kasa_wrapper import Kasa + +logger = log.getLogger() + + +class KeepAlive: + def __init__(self, **attrs): + self.attrs = self._dict_to_attrs(attrs) + self._check_expected_attrs() + self.keep_alives_counter = 0 + self.last_send_ts = datetime.now() + self.last_receive_ts = datetime.now() + self.last_receive_value = None + + def __getattr__(self, attr): + try: + return getattr(self.attrs, attr) + except AttributeError as e: + raise AttributeError(f"KeepAlive object is missing {attr} attribute") from e + + def _check_expected_attrs(self): + expected_attrs = [ + ("location_name", str), + ("interval", int), + ("timeout", int), + ("publish_topic", str), + ("subscribe_topic", str), + ] + for attr, attr_type in expected_attrs: + val = getattr(self, attr) + if not isinstance(val, attr_type): + raise AttributeError(f"{attr} attribute is not type {attr_type}") + + @staticmethod + def _dict_to_attrs(params_dict): + cls = namedtuple("Attrs", params_dict) + cls.__new__.__defaults__ = tuple(params_dict.values()) + return cls() + + +async def handle_main_event_mqtt_ka( + mqtt_msg: MqttMsgEvent, kasa: Kasa, ka: KeepAlive, mqtt_send_q: asyncio.Queue +): + ka.last_receive_ts = datetime.now() + if mqtt_msg.payload: + ka.last_receive_value = mqtt_msg.payload + + if kasa.curr_state: + logger.debug(f"Received keep alive from {ka.location_name}") + return + + logger.info( + f"Received keep alive for {ka.location_name} triggering device to be turned on" + ) + try: + kasa.recv_q.put_nowait(KasaStateEvent(name=ka.location_name, state=True)) + await mqtt_send_q.put( + MqttMsgEvent(topic=kasa.topic, payload=kasa.state_name(True)) + ) + except asyncio.queues.QueueFull: + logger.warning( + f"Device {ka.location_name} is too busy to take request to be set as " + f"{kasa.state_name(True)}" + ) + + +async def handle_keep_alives( + kasas: Dict[str, Kasa], kas: Dict[str, KeepAlive], mqtt_send_q: asyncio.Queue +): + if not kas: + logger.info( + "No keep alives to monitor based on config: handle_keep_alives is done." + ) + return + + ka_task_interval = Cfg().keep_alive_task_interval + while True: + await asyncio.sleep(ka_task_interval) + + for name, ka in kas.items(): + kasa = kasas[name] + if not kasa.curr_state: + # if device is not on, we are not interested in it + ka.keep_alives_counter = 0 + continue + + now = datetime.now() + # see if it is time to poke keep alive watchdog topic + tdelta = now - ka.last_send_ts + tdeltaSecs = int(tdelta.total_seconds()) + interval = max(ka_task_interval * 2, ka.interval) + if tdeltaSecs >= interval: + if ka.publish_topic: + await mqtt_send_q.put( + MqttMsgEvent( + topic=ka.publish_topic, payload=ka.last_receive_value + ) + ) + ka.last_send_ts = datetime.now() + # reset last_send_ts on the first ka send after activation + if not ka.keep_alives_counter: + ka.last_receive_ts = ka.last_send_ts + ka.keep_alives_counter += 1 + continue + + # see if it has been too long w/out an answer + tdelta = now - ka.last_receive_ts + tdeltaSecs = int(tdelta.total_seconds()) + if tdeltaSecs >= ka.timeout and ka.keep_alives_counter: + logger.info(f"Keep alive for {name} expired after {tdeltaSecs} seconds") + try: + kasa.recv_q.put_nowait(KasaStateEvent(name=name, state=False)) + await mqtt_send_q.put( + MqttMsgEvent(topic=kasa.topic, payload=kasa.state_name(False)) + ) + except asyncio.queues.QueueFull: + logger.warning( + f"Device {name} is too busy to take request to be set as " + f"{kasa.state_name(False)}" + ) diff --git a/build/lib/mqtt2kasa/log.py b/build/lib/mqtt2kasa/log.py new file mode 100644 index 0000000..5335a18 --- /dev/null +++ b/build/lib/mqtt2kasa/log.py @@ -0,0 +1,60 @@ +#!/usr/bin/env python +import logging +from logging.handlers import SysLogHandler +from os import path + + +def getLogger(): + return logging.getLogger("mqtt2kasa") + + +def _log_handler_address(files=tuple()): + try: + return next(f for f in files if path.exists(f)) + except StopIteration: + logging.warning( + "Invalid files: %s. Using stdout as fallback." % ", ".join(files) + ) + return None + + +def initLogger(testing=False): + logger = getLogger() + logger.setLevel(logging.INFO) + + format = ( + "%(asctime)s [mqtt2kasa] %(module)12s:%(lineno)-d %(levelname)-8s %(message)s" + ) + formatter = logging.Formatter(format) + + # Logs are normally configured here: /etc/rsyslog.d/* + logHandlerAddress = _log_handler_address( + ["/run/systemd/journal/syslog", "/var/run/syslog", "/device/log"] + ) + + if logHandlerAddress: + syslog = SysLogHandler( + address=logHandlerAddress, facility=SysLogHandler.LOG_DAEMON + ) + syslog.setFormatter(formatter) + logger.addHandler(syslog) + else: + stdout_handler = logging.StreamHandler() + stdout_handler.setFormatter(formatter) + logger.addHandler(stdout_handler) + + if testing: + log_to_console() + set_log_level_debug() + + +def log_to_console(): + consoleHandler = logging.StreamHandler() + format = "%(asctime)s %(module)12s:%(lineno)-d %(levelname)-8s %(message)s" + formatter = logging.Formatter(format) + consoleHandler.setFormatter(formatter) + getLogger().addHandler(consoleHandler) + + +def set_log_level_debug(): + getLogger().setLevel(logging.DEBUG) diff --git a/build/lib/mqtt2kasa/main.py b/build/lib/mqtt2kasa/main.py new file mode 100644 index 0000000..06c2aaf --- /dev/null +++ b/build/lib/mqtt2kasa/main.py @@ -0,0 +1,283 @@ +#!/usr/bin/env python +import asyncio +import collections +from contextlib import AsyncExitStack +import re + +from aiomqtt import Client, MqttError + +from mqtt2kasa import log +from mqtt2kasa.config import Cfg +from mqtt2kasa.events import KasaStateEvent, KasaEmeterEvent, MqttMsgEvent +from mqtt2kasa.kasa_wrapper import ( + Kasa, + handle_kasa_poller, + handle_kasa_emeter_poller, + handle_kasa_requests, +) +from mqtt2kasa.keep_alive import ( + KeepAlive, + handle_keep_alives, + handle_main_event_mqtt_ka, +) +from mqtt2kasa.mqtt import ( + handle_mqtt_publish, + handle_mqtt_messages, +) + + +class RunState: + def __init__(self): + self.kasas: dict[str, Kasa] = {} + self.topics: dict[str, str] = {} + self.keep_alives: dict[str, KeepAlive] = {} + self.keep_alive_topics: dict[str, str] = {} + + +async def handle_main_event_kasa( + kasa_state: KasaStateEvent, run_state: RunState, mqtt_send_q: asyncio.Queue +): + kasa = run_state.kasas.get(kasa_state.name) + if not kasa: + logger.warning( + f"Unable to find device with name {kasa_state.name}. Ignoring kasa event" + ) + return + payload = kasa.state_name(kasa_state.state) + logger.info( + f"Kasa event requesting mqtt for {kasa_state.name} to publish" + f" {kasa.topic} as {payload}" + ) + await mqtt_send_q.put(MqttMsgEvent(topic=kasa.topic, payload=payload)) + + +async def handle_emeter_event_kasa( + kasa_emeter: KasaEmeterEvent, run_state: RunState, mqtt_send_q: asyncio.Queue +): + kasa = run_state.kasas.get(kasa_emeter.name) + if not kasa: + logger.warning( + f"Unable to find device with name {kasa_emeter.name}. Ignoring kasa emeter event" + ) + return + topic = f"{kasa.topic}/emeter" + payload = kasa_emeter.emeter_status + logger.info( + f"Kasa emeter event requesting mqtt for {kasa_emeter.name} to publish" + f" {topic} as {payload}" + ) + await mqtt_send_q.put(MqttMsgEvent(topic=topic, payload=payload)) + + # also publish each value as a topic + # https://github.com/flavio-fernandes/mqtt2kasa/issues/10 + matches = re.findall(r"(\w+)=([^\s>]+)", payload) + for key, value in matches: + emeter_topic = f"{topic}/{key}" + await mqtt_send_q.put(MqttMsgEvent(topic=emeter_topic, payload=value)) + + +async def handle_main_event_mqtt( + mqtt_msg: MqttMsgEvent, run_state: RunState, mqtt_send_q: asyncio.Queue +): + name = run_state.topics.get(mqtt_msg.topic) + is_ka = name is None + if not name: + # topic is not used directly for a kasa device. Check if it is a keep alive subscribe + name = run_state.keep_alive_topics.get(mqtt_msg.topic) + if not name: + logger.warning( + f"Unable to map device from topic {mqtt_msg.topic}. Ignoring mqtt event" + ) + return + kasa = run_state.kasas[name] + if is_ka: + ka = run_state.keep_alives[name] + await handle_main_event_mqtt_ka(mqtt_msg, kasa, ka, mqtt_send_q) + return + if not mqtt_msg.payload: + logger.debug(f"No payload for topic {mqtt_msg.topic}. Ignoring mqtt event") + return + try: + translated, new_state = kasa.state_parse(mqtt_msg.payload) + if translated: + await mqtt_send_q.put( + MqttMsgEvent(topic=mqtt_msg.topic, payload=translated) + ) + return + except ValueError as e: + logger.warning(f"Unexpected payload for topic {mqtt_msg.topic}: {e}") + return + try: + kasa.recv_q.put_nowait(KasaStateEvent(name=name, state=new_state)) + except asyncio.queues.QueueFull: + logger.warning( + f"Device {name} is too busy to take request to be set as " + f"{kasa.state_name(new_state)}" + ) + return + msg = f"Mqtt event causing device {name} to be set as {kasa.state_name(new_state)}" + if kasa.state_name(new_state) != mqtt_msg.payload: + msg += f" ({mqtt_msg.payload})" + logger.info(msg) + + +async def handle_main_events( + run_state: RunState, mqtt_send_q: asyncio.Queue, main_events_q: asyncio.Queue +): + handlers = { + "KasaStateEvent": handle_main_event_kasa, + "KasaEmeterEvent": handle_emeter_event_kasa, + "MqttMsgEvent": handle_main_event_mqtt, + } + while True: + main_event = await main_events_q.get() + logger.debug(f"Handling {main_event.event}...") + handler = handlers.get(main_event.event) + if handler: + await handler(main_event, run_state, mqtt_send_q) + else: + logger.error(f"No handler found for {main_event.event}") + main_events_q.task_done() + + +async def cancel_tasks(tasks): + logger.info("Cancelling all tasks") + for task in tasks: + if task.done(): + continue + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + + +async def main_loop(): + global stop_gracefully + + # used to be: https://pypi.org/project/asyncio-mqtt/ + # https://pypi.org/project/aiomqtt/ + logger.debug("Starting main event processing loop") + cfg = Cfg() + mqtt_broker_ip = cfg.mqtt_host + mqtt_client_id = cfg.mqtt_client_id + mqtt_username = cfg.mqtt_username + mqtt_password = cfg.mqtt_password + mqtt_send_q = asyncio.Queue(maxsize=256) + main_events_q = asyncio.Queue(maxsize=256) + + # We 💛 context managers. Let's create a stack to help + # us manage them. + async with AsyncExitStack() as stack: + # Keep track of the asyncio tasks that we create, so that + # we can cancel them on exit + tasks = set() + stack.push_async_callback(cancel_tasks, tasks) + + client = Client( + mqtt_broker_ip, + username=mqtt_username, + password=mqtt_password, + client_id=mqtt_client_id, + ) + await stack.enter_async_context(client) + + messages = await stack.enter_async_context(client.unfiltered_messages()) + task = asyncio.create_task(handle_mqtt_messages(messages, main_events_q)) + tasks.add(task) + + task = asyncio.create_task(handle_mqtt_publish(client, mqtt_send_q)) + tasks.add(task) + + run_state = RunState() + for name, config in cfg.locations.items(): + topic = cfg.mqtt_topic(name) + if topic in run_state.topics: + raise RuntimeError( + f"Topic {topic} assigned to more than one device: " + f"{name} and {run_state.topics[topic]}" + ) + run_state.topics[topic] = name + await client.subscribe(topic) + run_state.kasas[name] = Kasa(name, topic, config) + + for kasa in run_state.kasas.values(): + tasks.add(asyncio.create_task(handle_kasa_poller(kasa, main_events_q))) + tasks.add(asyncio.create_task(handle_kasa_requests(kasa))) + if kasa.emeter_poll_interval: + tasks.add( + asyncio.create_task(handle_kasa_emeter_poller(kasa, main_events_q)) + ) + + for name, config in cfg.keep_alives.items(): + if name not in run_state.kasas: + raise RuntimeError( + f"Keep alive {name} must have a corresponding location entry" + ) + config["location_name"] = name + ka = KeepAlive(**config) + topic = ka.subscribe_topic + if topic in run_state.topics or topic in run_state.keep_alive_topics: + raise RuntimeError( + f"Subscribe topic {topic} for keep alive {name} is not unique" + ) + run_state.keep_alive_topics[topic] = name + await client.subscribe(ka.subscribe_topic) + run_state.keep_alives[name] = ka + + task = asyncio.create_task( + handle_keep_alives(run_state.kasas, run_state.keep_alives, mqtt_send_q) + ) + tasks.add(task) + + task = asyncio.create_task( + handle_main_events(run_state, mqtt_send_q, main_events_q) + ) + tasks.add(task) + + # Wait for everything to complete (or fail due to, e.g., network errors) + await asyncio.gather(*tasks) + + logger.debug("all done!") + + +# cfg_globals +stop_gracefully = False +logger = None + + +async def main(): + global stop_gracefully + + # Run the advanced_example indefinitely. Reconnect automatically + # if the connection is lost. + reconnect_interval = Cfg().reconnect_interval + while not stop_gracefully: + try: + await main_loop() + except MqttError as error: + logger.warning( + f'MQTT error "{error}". Reconnecting in {reconnect_interval} seconds.' + ) + except (KeyboardInterrupt, SystemExit): + logger.info("got KeyboardInterrupt") + stop_gracefully = True + break + await asyncio.sleep(reconnect_interval) + + +if __name__ == "__main__": + logger = log.getLogger() + log.initLogger() + + knobs = Cfg().knobs + if isinstance(knobs, collections.abc.Mapping): + if knobs.get("log_to_console"): + log.log_to_console() + if knobs.get("log_level_debug"): + log.set_log_level_debug() + + logger.debug("mqtt2kasa process started") + asyncio.run(main()) + if not stop_gracefully: + raise RuntimeError("main is exiting") diff --git a/build/lib/mqtt2kasa/mqtt.py b/build/lib/mqtt2kasa/mqtt.py new file mode 100644 index 0000000..056c2e8 --- /dev/null +++ b/build/lib/mqtt2kasa/mqtt.py @@ -0,0 +1,39 @@ +import asyncio + +from mqtt2kasa import log +from mqtt2kasa.config import Cfg +from mqtt2kasa.events import MqttMsgEvent + +logger = log.getLogger() + + +async def handle_mqtt_publish(client, mqtt_send_q: asyncio.Queue): + c = Cfg() + mqtt_qos = c.mqtt_qos + mqtt_retain = c.mqtt_retain + logger.info( + f"handle_mqtt_publish task started. Using retain:{mqtt_retain} and qos:{mqtt_qos}" + ) + while True: + mqtt_msg = await mqtt_send_q.get() + topic, payload = mqtt_msg.topic, mqtt_msg.payload + # logger.debug(f"Publishing: {topic} {payload}") + try: + await client.publish( + topic, payload, timeout=15, qos=mqtt_qos, retain=mqtt_retain + ) + logger.debug(f"Published: {topic} {payload}") + except Exception as e: + logger.error("client failed publish mqtt %s %s : %s", topic, payload, e) + mqtt_send_q.task_done() + # Dampen publishes. This is a fail-safe and should not affect anything unless + # there is a bug lurking somewhere + await asyncio.sleep(1) + + +async def handle_mqtt_messages(messages, main_events_q: asyncio.Queue): + async for message in messages: + msg_topic = message.topic + msg_payload = message.payload.decode() + logger.debug(f"Received mqtt topic:{msg_topic} payload:{msg_payload}") + await main_events_q.put(MqttMsgEvent(topic=msg_topic, payload=msg_payload)) diff --git a/build/lib/mqtt2kasa/tests/__init__.py b/build/lib/mqtt2kasa/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/build/lib/mqtt2kasa/tests/basic_test.sh.vagrant b/build/lib/mqtt2kasa/tests/basic_test.sh.vagrant new file mode 100644 index 0000000..3aebc38 --- /dev/null +++ b/build/lib/mqtt2kasa/tests/basic_test.sh.vagrant @@ -0,0 +1,83 @@ +#!/bin/bash + +#set -o xtrace +set -o errexit + +MQTT_BROKER='192.168.123.123' +TMP_OUTPUT=/tmp/basic_test.tmp + +get_log_lines () { + NUM_LINES=${1:-3} + sleep 1.2 ; # give it a sec or 2 to finish... + sudo journalctl -u mqtt2kasa.service --no-pager --lines=${NUM_LINES} --output=cat > ${TMP_OUTPUT} +} + +get_simulator_lines () { + NUM_LINES=${1:-3} + sleep 1.2 ; # give it a sec or 2 to finish... + sudo journalctl -u tplink-smarthome-simulator.service --no-pager --lines=${NUM_LINES} --output=cat > ${TMP_OUTPUT} +} + +# restart service to trigger discovery +sudo systemctl restart mqtt2kasa +sleep 10 +echo TEST: Check discovery +get_log_lines 15 +grep --quiet -E 'Discovered 192\.168\.123\.201 .*thing1' ${TMP_OUTPUT} || \ + { echo "FAILED in $0 line ${LINENO}" >&2; exit ${LINENO}; } +grep --quiet -E 'Discovered 192\.168\.123\.202 .*thing2' ${TMP_OUTPUT} || \ + { echo "FAILED in $0 line ${LINENO}" >&2; exit ${LINENO}; } +grep --quiet -E 'Discovered 192\.168\.123\.203 .* thing3' ${TMP_OUTPUT} || \ + { echo "FAILED in $0 line ${LINENO}" >&2; exit ${LINENO}; } + +echo TEST: EMeter +grep --quiet -E 'bar has no emeter' ${TMP_OUTPUT} || \ + { echo "FAILED in $0 line ${LINENO}" >&2; exit ${LINENO}; } +grep --quiet -E 'emeter event requesting mqtt for lar to publish /lar/switch/emeter' ${TMP_OUTPUT} || \ + { echo "FAILED in $0 line ${LINENO}" >&2; exit ${LINENO}; } + +echo TEST: Check on/off +mosquitto_pub -h ${MQTT_BROKER} -t /foo -m "ofF" +get_log_lines +grep --quiet -E 'Mqtt event causing device foo to be set as off' ${TMP_OUTPUT} || \ + { echo "FAILED in $0 line ${LINENO}" >&2; exit ${LINENO}; } + +mosquitto_pub -h ${MQTT_BROKER} -t /kasa/device/bar -m "On" +get_log_lines +grep --quiet -E 'Mqtt event causing device bar to be set as on' ${TMP_OUTPUT} || \ + { echo "FAILED in $0 line ${LINENO}" >&2; exit ${LINENO}; } + +mosquitto_pub -h ${MQTT_BROKER} -t /lar/switch -m "1" +get_log_lines +grep --quiet -E 'Mqtt event causing device lar to be set as on' ${TMP_OUTPUT} || \ + { echo "FAILED in $0 line ${LINENO}" >&2; exit ${LINENO}; } + +echo TEST: Check toggle +mosquitto_pub -h ${MQTT_BROKER} -t /foo -m "toggle" +get_log_lines +grep --quiet -E 'Mqtt event causing device foo to be set as on' ${TMP_OUTPUT} || \ + { echo "FAILED in $0 line ${LINENO}" >&2; exit ${LINENO}; } +get_simulator_lines +grep --quiet -E 'set_relay_state.*"state": 1' ${TMP_OUTPUT} || \ + { echo "FAILED in $0 line ${LINENO}" >&2; exit ${LINENO}; } +grep --quiet -E '192\.168\.123\.201' ${TMP_OUTPUT} || \ + { echo "FAILED in $0 line ${LINENO}" >&2; exit ${LINENO}; } +grep --quiet -E '"err_code":0' ${TMP_OUTPUT} || \ + { echo "FAILED in $0 line ${LINENO}" >&2; exit ${LINENO}; } + +mosquitto_pub -h ${MQTT_BROKER} -t /lar/switch -m "flip" +get_log_lines +grep --quiet -E 'Mqtt event causing device lar to be set as off' ${TMP_OUTPUT} || \ + { echo "FAILED in $0 line ${LINENO}" >&2; exit ${LINENO}; } +get_simulator_lines +grep --quiet -E 'set_relay_state.*"state": 0' ${TMP_OUTPUT} || \ + { echo "FAILED in $0 line ${LINENO}" >&2; exit ${LINENO}; } +grep --quiet -E '192\.168\.123\.203' ${TMP_OUTPUT} || \ + { echo "FAILED in $0 line ${LINENO}" >&2; exit ${LINENO}; } +grep --quiet -E '"err_code":0' ${TMP_OUTPUT} || \ + { echo "FAILED in $0 line ${LINENO}" >&2; exit ${LINENO}; } + + +echo 'PASSED: Happy happy, joy joy!' +rm -f ${TMP_OUTPUT} +exit 0 diff --git a/build/lib/mqtt2kasa/tests/simulator.js.vagrant b/build/lib/mqtt2kasa/tests/simulator.js.vagrant new file mode 100644 index 0000000..c1c8ef2 --- /dev/null +++ b/build/lib/mqtt2kasa/tests/simulator.js.vagrant @@ -0,0 +1,36 @@ +const { Device } = require('..'); +const { UdpServer } = require('..'); + +const devices = []; + +devices.push( + new Device({ + port: 9999, + responseDelay: 100, + address: '192.168.123.201', + model: 'hs100', + data: { alias: 'Mock HS100 thing1', mac: '00:01:01:01:01:01', deviceId: 'thing1' }, + }) +); +devices.push( + new Device({ + port: 9999, + address: '192.168.123.202', + model: 'hs105', + data: { alias: 'Mock HS105 thing2', mac: '00:02:02:02:02:02', deviceId: 'thing2' }, + }) +); +devices.push( + new Device({ + port: 9999, + address: '192.168.123.203', + model: 'hs110', + data: { alias: 'Mock HS110 thing3', mac: '00:03:03:03:03:03', deviceId: 'thing3' }, + }) +); + +devices.forEach((d) => { + d.start(); +}); + +UdpServer.start(); diff --git a/build/lib/mqtt2kasa/tests/tplink-smarthome-simulator.service.vagrant b/build/lib/mqtt2kasa/tests/tplink-smarthome-simulator.service.vagrant new file mode 100644 index 0000000..e3f3032 --- /dev/null +++ b/build/lib/mqtt2kasa/tests/tplink-smarthome-simulator.service.vagrant @@ -0,0 +1,17 @@ +[Unit] +Description=tplink-smarthome-simulator +After=network.target + +[Service] +User=vagrant +Group=vagrant +Environment=PATH=/usr/bin/ +Environment=DEBUG='*,-device:udp,*:error node test/simulator.js' +ExecStartPre=/bin/bash -c 'while :; do [ -e /vagrant/mqtt2kasa/bin/mqtt2kasa.service.vagrant ] && break; /bin/sleep 1; done' +ExecStart=/usr/bin/node test/simulator.js +WorkingDirectory=/home/vagrant/tplink-smarthome-simulator +Restart=always + +[Install] +WantedBy=multi-user.target +Alias=tplink-smarthome-simulator.service diff --git a/build/lib/mqtt2kasa/tests/unit/__init__.py b/build/lib/mqtt2kasa/tests/unit/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/build/lib/mqtt2kasa/tests/unit/test_pass.py b/build/lib/mqtt2kasa/tests/unit/test_pass.py new file mode 100644 index 0000000..61710a1 --- /dev/null +++ b/build/lib/mqtt2kasa/tests/unit/test_pass.py @@ -0,0 +1,2 @@ +def test_run(): + pass diff --git a/data/config.yaml b/data/config.yaml index af71b7b..dddb5fd 100644 --- a/data/config.yaml +++ b/data/config.yaml @@ -3,50 +3,231 @@ knobs: # Note: normally you don't set these... here just to show how to # devel and debug - # log_to_console: false - # log_level_debug: false + log_to_console: false + log_level_debug: false mqtt: # ip/dns for the mqtt broker - host: 192.168.1.250 + host: 192.168.44.229 + #username: shelly + #password: shelly + #client_id: kasa_hellow # uncomment to publish with retain - # retain: true + retain: true # specify qos value (default is 0) - # qos: 1 + qos: 2 globals: # every location will be managed using a unique mqtt topic # unless explicitly specified, this format will be used topic_format: /{}/switch # kasa will monitor the current state of the device every # poll interval, in seconds. You can override on a per device - poll_interval: 11 + poll_interval: 55 locations: # coffee maker. To turn it on, use mqtt publish # topic: /coffee_maker/switch payload: on # subscribe to /coffee_maker/switch to know its state - coffee_maker: - host: 192.168.1.21 + # coffee_maker: + # host: 192.168.1.21 # toaster is similar to the coffee maker, except it relies on # kasa discovery in order to locate the device via its alias. - toaster: - alias: toaster + # toaster: + # alias: toaster # example where topic is explicitly provided for a given device - kitchen lights: - host: 192.168.1.22 - topic: /kitchen/light_switch + # kitchen lights: + # host: 192.168.1.22 + # topic: /kitchen/light_switch # example where we indicate a specific poll interval. # Also, adding a task to publish emeter info at provided interval - pantry: - alias: storage - poll_interval: 120 - emeter_poll_interval: 600 -keep_alives: + + + + + router: + #host: 192.168.66.176 + host: 192.168.66.43 + poll_interval: 55 + emeter_poll_interval: 55 + topic: kasa/laundry/router_rack + + #router: + # host: 192.168.66.40 + # poll_interval: 55 + # emeter_poll_interval: 55 + # topic: kasa/laundry/router + + washing_machine: + host: 192.168.66.107 + poll_interval: 55 + emeter_poll_interval: 55 + topic: kasa/laundry/washing_machine + + master_fan: + host: 192.168.66.60 + poll_interval: 55 + emeter_poll_interval: 55 + topic: kasa/master/night_fan + + #fireplace_pm: + # host: 192.168.66.176 + # poll_interval: 55 + # emeter_poll_interval: 55 + # topic: kasa/gameroom/fireplace + + garage_fridge: + host: 192.168.66.224 + poll_interval: 55 + emeter_poll_interval: 55 + topic: kasa/garage/fridge + + office_stereo: + host: 192.168.66.201 + poll_interval: 55 + emeter_poll_interval: 55 + topic: kasa/office/stereo + + office_bathroom: + host: 192.168.66.197 + poll_interval: 55 + topic: kasa/office/bathroom + + office_closet: + host: 192.168.66.15 + poll_interval: 55 + emeter_poll_interval: 55 + topic: kasa/office/closet + + office_shrine: + host: 192.168.66.176 + poll_interval: 55 + emeter_poll_interval: 55 + topic: kasa/office/shrine + + office_desk: + host: 192.168.66.234 + poll_interval: 55 + emeter_poll_interval: 55 + topic: kasa/office/desk + + #v2 needs authentication + #gameroom_console: + # host: 192.168.66.140 + # poll_interval: 55 + # emeter_poll_interval: 55 + # topic: kasa/gameroom/console + + kitchen_dishwasher: + host: 192.168.66.234 + poll_interval: 55 + emeter_poll_interval: 55 + topic: kasa/kitchen/dishwasher + + kitchen_fridge: + host: 192.168.66.164 + poll_interval: 55 + emeter_poll_interval: 55 + topic: kasa/kitchen/fridge + + living_tv: + host: 192.168.66.144 + poll_interval: 55 + emeter_poll_interval: 55 + topic: kasa/livingroom/tv + + alexa_bathroom_downstairs: + host: 192.168.66.38 + poll_interval: 55 + emeter_poll_interval: 55 + topic: kasa/kids/alexa_bathroom_downstairs + + family_computer: + host: 192.168.66.96 + poll_interval: 55 + emeter_poll_interval: 55 + topic: kasa/kitchen/family_computer + + weather_machine: + host: 192.168.66.31 + poll_interval: 55 + topic: kasa/gameroom/weather_machine + + bathroom_downstairs_light: + host: 192.168.66.252 + poll_interval: 55 + topic: kasa/kids/bathroom_downstairs_light + + connor_fan: + host: 192.168.66.35 + poll_interval: 55 + topic: kasa/connor/fan_ceiling + + brook_fan: + host: 192.168.66.49 + poll_interval: 55 + topic: kasa/brook/fan_ceiling + + gameroom_fan: + host: 192.168.66.126 + poll_interval: 55 + topic: kasa/gameroom/fan_ceiling + + gameroom_lights: + host: 192.168.66.227 + poll_interval: 55 + topic: kasa/gameroom/lights_ceiling + + garage_light: + host: 192.168.66.68 + poll_interval: 55 + topic: kasa/garage/light + + livingroom_lights: + host: 192.168.66.50 + poll_interval: 55 + topic: kasa/livingroom/lights + + livingroom_fan: + host: 192.168.66.150 + poll_interval: 55 + topic: kasa/livingroom/fan + + master_bath_lights: + host: 192.168.66.193 + poll_interval: 55 + topic: kasa/master/bath_lights + + master_poop_fan: + host: 192.168.66.249 + poll_interval: 55 + topic: kasa/master/bath_poop_fan + + master_shower_fan: + host: 192.168.66.173 + poll_interval: 55 + topic: kasa/master/bath_shower_fan + + master_lights: + host: 192.168.66.216 + poll_interval: 55 + topic: kasa/master/lights + + kids_bathroom_upstairs: + host: 192.168.66.151 + poll_interval: 55 + topic: kasa/kids/bathroom_upstairs_light + + kitchen_pantry: + host: 192.168.66.157 + poll_interval: 55 + topic: kasa/kitchen/pantry_light + +#keep_alives: # this is a very optional thing but can be useful. It will monitor a # specific topic to determine if a device should be on or off. The # amount of time it has been since it received something in the subscribe # topic is the trigger for turning the location on/off. A usage example # is available here: https://github.com/flavio-fernandes/imalive - toaster: # the location affected by this keep-alive - interval: 10 # how often to send a ping, in seconds - timeout: 60 # how long w/out a 'pong' to turn off device - publish_topic: /toaster/ping # set this to '' if a ping is not needed - subscribe_topic: /toaster/pong +# toaster: # the location affected by this keep-alive +# interval: 10 # how often to send a ping, in seconds +# timeout: 55 # how long w/out a 'pong' to turn off device +# publish_topic: /toaster/ping # set this to '' if a ping is not needed +# subscribe_topic: /toaster/pong diff --git a/mqtt2kasa/main.py b/mqtt2kasa/main.py index 06c2aaf..28d67c9 100755 --- a/mqtt2kasa/main.py +++ b/mqtt2kasa/main.py @@ -3,9 +3,9 @@ import collections from contextlib import AsyncExitStack import re - +import json from aiomqtt import Client, MqttError - +from datetime import datetime, timezone from mqtt2kasa import log from mqtt2kasa.config import Cfg from mqtt2kasa.events import KasaStateEvent, KasaEmeterEvent, MqttMsgEvent @@ -50,6 +50,15 @@ async def handle_main_event_kasa( ) await mqtt_send_q.put(MqttMsgEvent(topic=kasa.topic, payload=payload)) + utc_time = datetime.now(timezone.utc) + epoch_utc_secs = int(utc_time.timestamp()) + json_topic = f"{kasa.topic}/status_json" + s = {} + s["timestamp"] = epoch_utc_secs + s["state"] = kasa.state_name(kasa_state.state) + jStr = json.dumps(s) + await mqtt_send_q.put(MqttMsgEvent(topic=json_topic, payload=jStr)) + async def handle_emeter_event_kasa( kasa_emeter: KasaEmeterEvent, run_state: RunState, mqtt_send_q: asyncio.Queue @@ -68,12 +77,27 @@ async def handle_emeter_event_kasa( ) await mqtt_send_q.put(MqttMsgEvent(topic=topic, payload=payload)) + utc_time = datetime.now(timezone.utc) + epoch_utc_secs = int(utc_time.timestamp()) + timestamp_topic = f"{topic}/timestamp" + + kD = {} # also publish each value as a topic # https://github.com/flavio-fernandes/mqtt2kasa/issues/10 matches = re.findall(r"(\w+)=([^\s>]+)", payload) for key, value in matches: emeter_topic = f"{topic}/{key}" + kD[key] = value await mqtt_send_q.put(MqttMsgEvent(topic=emeter_topic, payload=value)) + + await mqtt_send_q.put(MqttMsgEvent(topic=timestamp_topic, payload=epoch_utc_secs)) + + json_topic = f"{topic}/emeter_json" + kD["timestamp"] = epoch_utc_secs + jStr = json.dumps(kD) + await mqtt_send_q.put(MqttMsgEvent(topic=json_topic, payload=jStr)) + + async def handle_main_event_mqtt( diff --git a/requirements.txt b/requirements.txt index d0951a6..c7c3d78 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ -aiomqtt < 2.0 +aiomqtt asyncio-throttle paho-mqtt python-kasa diff --git a/setup.cfg b/setup.cfg index a9b4d39..82e2f25 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = mqtt2kasa -version = 0.0.1 +version = 0.0.3 summary = MQTT front end wrapper to python-kasa author = Flavio Fernandes home-page = https://flaviof.com