Skip to content

Commit

Permalink
Add support for publishing emeter information
Browse files Browse the repository at this point in the history
Use emeter_poll_interval on a per device or under globals section
of the config in order to have emeter values published.

The topic for the emeter values will be published with 'emeter'
topic. Example:  '/+/switch/emeter'

Fixes: #6
Signed-off-by: Flavio Fernandes <[email protected]>
  • Loading branch information
flavio-fernandes committed Aug 19, 2023
1 parent 9269fba commit 17096b0
Show file tree
Hide file tree
Showing 12 changed files with 135 additions and 17 deletions.
14 changes: 12 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ globals:
# kasa will monitor the current state of the device every
# poll interval, in seconds. You can override on a per device
poll_interval: 11
# if devices support metering (aka emeter), use this poll
# interval to publish it. You can override on a per device
# emeter_poll_interval: 600
locations:
# coffee maker. To turn it on, use mqtt publish
# topic: /coffee_maker/switch payload: on
Expand All @@ -36,10 +39,11 @@ locations:
kitchen lights:
host: 192.168.1.22
topic: /kitchen/light_switch
# example where we indicate a specific poll interval
# example where we indicate a specific poll intervals
pantry:
alias: storage
poll_interval: 120
emeter_poll_interval: 1800
```
Devices are connected via **host** or discovered by Kasa via **alias**. There are more attributes
Expand Down Expand Up @@ -76,10 +80,16 @@ $ MQTT=192.168.1.250 && \
2021-01-30T21:43:03-0500 : 0 : /toaster/switch : on
```

**NOTE on Metering**: If metering is supported by device and `emeter_poll_interval` was provided, it will be published via topics that end with "emeter":

```
$ mosquitto_sub -h $MQTT -t '/+/switch/emeter'
```

In order to damper endless on/off cycles, this implementation sets an
[async throttle](https://pypi.org/project/asyncio-throttle/) for each device.
If there is a need to tweak that, the attributes are located in
[kasa_wrapper.py](https://github.com/flavio-fernandes/mqtt2kasa/blob/main/mqtt2kasa/kasa_wrapper.py#L26-L27).
[kasa_wrapper.py](https://github.com/flavio-fernandes/mqtt2kasa/blob/60e37a8e527a04eee54853d42366de314c10cefe/mqtt2kasa/kasa_wrapper.py#L30-L31).

**NOTE:** Use python 3.7 or newer, as this project requires a somewhat
recent implementation of [asyncio](https://realpython.com/async-io-python/).
Expand Down
4 changes: 3 additions & 1 deletion data/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@ locations:
kitchen lights:
host: 192.168.1.22
topic: /kitchen/light_switch
# example where we indicate a specific poll interval
# example where we indicate a specific poll interval.
# Also, adding a task to publish emeter info at provided interval
pantry:
alias: storage
poll_interval: 120
emeter_poll_interval: 600
keep_alives:
# this is a very optional thing but can be useful. It will monitor a
# specific topic to determine if a device should be on or off. The
Expand Down
6 changes: 5 additions & 1 deletion data/config.yaml.vagrant
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ knobs:
globals:
# test with slow polls to make log less confusing
poll_interval: 3600
# emeter_poll_interval: 0
# topic_format: /kasa/device/{}
locations:
foo:
Expand All @@ -18,8 +19,11 @@ locations:
bar:
# host: 192.168.123.202
alias: Mock HS105 thing2
# this device does not have emeter capabilities
emeter_poll_interval: 888
lar:
topic: /lar/switch
host: 192.168.123.203
# alias: Mock HS110 thing3

# this device has emeter capabilities
emeter_poll_interval: 888
12 changes: 12 additions & 0 deletions mqtt2kasa/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,18 @@ def poll_interval(self, location_name):
cfg_globals.get("poll_interval") or const.KASA_DEFAULT_POLL_INTERVAL
)

def emeter_poll_interval(self, location_name):
locations = self._get_info().locations
if isinstance(locations, collections.abc.Mapping):
location_attributes = locations.get(location_name, {})
if location_attributes.get("emeter_poll_interval"):
return float(location_attributes["emeter_poll_interval"])
cfg_globals = self._get_info().cfg_globals
return float(
cfg_globals.get("emeter_poll_interval")
or const.KASA_DEFAULT_EMETER_POLL_INTERVAL
)

@property
def locations(self):
return self._get_info().locations
Expand Down
1 change: 1 addition & 0 deletions mqtt2kasa/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@
MQTT_DEFAULT_BROKER_IP = "192.168.10.238"
MQTT_DEFAULT_RECONNECT_INTERVAL = 13 # [seconds]
KASA_DEFAULT_POLL_INTERVAL = 10 # [seconds]
KASA_DEFAULT_EMETER_POLL_INTERVAL = 0 # [seconds] 0 == disabled
KEEP_ALIVE_DEFAULT_TASK_INTERVAL = 1.5 # [seconds]
6 changes: 6 additions & 0 deletions mqtt2kasa/events.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,9 @@ class KasaStateEvent(BaseEvent):
def __init__(self, **attrs):
expected_attrs = "name", "state"
super().__init__(expected_attrs, attrs)


class KasaEmeterEvent(BaseEvent):
def __init__(self, **attrs):
expected_attrs = "name", "emeter_status"
super().__init__(expected_attrs, attrs)
64 changes: 56 additions & 8 deletions mqtt2kasa/kasa_wrapper.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
from typing import Optional

from asyncio_throttle import Throttler
from kasa import Discover
from kasa import Discover, EmeterStatus
from kasa.smartdevice import SmartDevice, SmartDeviceException

from mqtt2kasa import log
from mqtt2kasa.config import Cfg
from mqtt2kasa.events import KasaStateEvent
from mqtt2kasa.events import KasaStateEvent, KasaEmeterEvent

logger = log.getLogger()

Expand All @@ -26,6 +26,7 @@ def __init__(self, name: str, topic: str, config: dict):
self.host = config.get("host")
self.alias = config.get("alias")
self.poll_interval = Cfg().poll_interval(name)
self.emeter_poll_interval = Cfg().emeter_poll_interval(name)
self.recv_q = asyncio.Queue(maxsize=4)
self.throttler = Throttler(rate_limit=4, period=60)
self.curr_state = None
Expand Down Expand Up @@ -99,6 +100,26 @@ async def turn_off(self):
except SmartDeviceException as e:
logger.error(f"{self.host} unable to turn_off: {e}")

@property
async def has_emeter(self) -> Optional[bool]:
try:
device = await self._get_device()
await device.update()
return device.has_emeter
except SmartDeviceException as e:
logger.error(f"{self.host} unable to get has_emeter: {e}")
# implicit return None

@property
async def emeter_realtime(self) -> Optional[EmeterStatus]:
try:
device = await self._get_device()
await device.update()
return device.emeter_realtime
except SmartDeviceException as e:
logger.error(f"{self.host} unable to fetch emeter: {e}")
# implicit return None

@classmethod
def state_from_name(cls, is_on: Optional[str]) -> bool:
return is_on == cls.STATE_ON
Expand Down Expand Up @@ -168,13 +189,40 @@ async def handle_kasa_poller(kasa: Kasa, main_events_q: asyncio.Queue):
)
)
kasa.curr_state = new_state
await asyncio.sleep(kasa.poll_interval)
await _sleep_with_jitter(kasa.poll_interval)


async def handle_kasa_emeter_poller(kasa: Kasa, main_events_q: asyncio.Queue):
fails = 0
while True:
# chatty
# logger.debug(f"Polling {kasa.name} emeter now. Interval is {kasa.emeter_poll_interval} seconds")
if await kasa.has_emeter == False:
logger.info(f"{kasa.name} has no emeter. no emeter polling is needed")
break

emeter_status = await kasa.emeter_realtime
if emeter_status is None:
fails += 1
logger.error(
f"Polling {kasa.name} emeter ({kasa.host}) failed {fails} times"
)
else:
fails = 0
await main_events_q.put(
KasaEmeterEvent(name=kasa.name, emeter_status=str(emeter_status))
)
await _sleep_with_jitter(kasa.emeter_poll_interval)


async def _sleep_with_jitter(interval):
await asyncio.sleep(interval)

# In order to avoid all processes sleeping and waking up at the same time,
# add a little jitter. Pick a value between 0 and 1.2 seconds
jitter = random.randint(99, 1201)
jitterSleep = float(jitter) / 1000
await asyncio.sleep(jitterSleep)
# In order to avoid all processes sleeping and waking up at the same time,
# add a little jitter. Pick a value between 0 and 1.2 seconds
jitter = random.randint(99, 1201)
jitterSleep = float(jitter) / 1000
await asyncio.sleep(jitterSleep)


async def handle_kasa_requests(kasa: Kasa):
Expand Down
Empty file modified mqtt2kasa/keep_alive.py
100644 → 100755
Empty file.
37 changes: 33 additions & 4 deletions mqtt2kasa/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,17 @@
import collections
from contextlib import AsyncExitStack

from asyncio_mqtt import Client, MqttError
from aiomqtt import Client, MqttError

from mqtt2kasa import log
from mqtt2kasa.config import Cfg
from mqtt2kasa.events import KasaStateEvent, MqttMsgEvent
from mqtt2kasa.kasa_wrapper import Kasa, handle_kasa_poller, handle_kasa_requests
from mqtt2kasa.events import KasaStateEvent, KasaEmeterEvent, MqttMsgEvent
from mqtt2kasa.kasa_wrapper import (
Kasa,
handle_kasa_poller,
handle_kasa_emeter_poller,
handle_kasa_requests,
)
from mqtt2kasa.keep_alive import (
KeepAlive,
handle_keep_alives,
Expand Down Expand Up @@ -45,6 +50,24 @@ async def handle_main_event_kasa(
await mqtt_send_q.put(MqttMsgEvent(topic=kasa.topic, payload=payload))


async def handle_emeter_event_kasa(
kasa_emeter: KasaEmeterEvent, run_state: RunState, mqtt_send_q: asyncio.Queue
):
kasa = run_state.kasas.get(kasa_emeter.name)
if not kasa:
logger.warning(
f"Unable to find device with name {kasa_emeter.name}. Ignoring kasa emeter event"
)
return
topic = f"{kasa.topic}/emeter"
payload = kasa_emeter.emeter_status
logger.info(
f"Kasa emeter event requesting mqtt for {kasa_emeter.name} to publish"
f" {topic} as {payload}"
)
await mqtt_send_q.put(MqttMsgEvent(topic=topic, payload=payload))


async def handle_main_event_mqtt(
mqtt_msg: MqttMsgEvent, run_state: RunState, mqtt_send_q: asyncio.Queue
):
Expand Down Expand Up @@ -95,6 +118,7 @@ async def handle_main_events(
):
handlers = {
"KasaStateEvent": handle_main_event_kasa,
"KasaEmeterEvent": handle_emeter_event_kasa,
"MqttMsgEvent": handle_main_event_mqtt,
}
while True:
Expand Down Expand Up @@ -123,7 +147,8 @@ async def cancel_tasks(tasks):
async def main_loop():
global stop_gracefully

# https://pypi.org/project/asyncio-mqtt/
# used to be: https://pypi.org/project/asyncio-mqtt/
# https://pypi.org/project/aiomqtt/
logger.debug("Starting main event processing loop")
cfg = Cfg()
mqtt_broker_ip = cfg.mqtt_host
Expand Down Expand Up @@ -171,6 +196,10 @@ async def main_loop():
for kasa in run_state.kasas.values():
tasks.add(asyncio.create_task(handle_kasa_poller(kasa, main_events_q)))
tasks.add(asyncio.create_task(handle_kasa_requests(kasa)))
if kasa.emeter_poll_interval:
tasks.add(
asyncio.create_task(handle_kasa_emeter_poller(kasa, main_events_q))
)

for name, config in cfg.keep_alives.items():
if name not in run_state.kasas:
Expand Down
Empty file modified mqtt2kasa/mqtt.py
100644 → 100755
Empty file.
6 changes: 6 additions & 0 deletions mqtt2kasa/tests/basic_test.sh.vagrant
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ grep --quiet -E 'Discovered 192\.168\.123\.202 .*thing2' ${TMP_OUTPUT} || \
grep --quiet -E 'Discovered 192\.168\.123\.203 .* thing3' ${TMP_OUTPUT} || \
{ echo "FAILED in $0 line ${LINENO}" >&2; exit ${LINENO}; }

echo TEST: EMeter
grep --quiet -E 'bar has no emeter' ${TMP_OUTPUT} || \
{ echo "FAILED in $0 line ${LINENO}" >&2; exit ${LINENO}; }
grep --quiet -E 'emeter event requesting mqtt for lar to publish /lar/switch/emeter' ${TMP_OUTPUT} || \
{ echo "FAILED in $0 line ${LINENO}" >&2; exit ${LINENO}; }

echo TEST: Check on/off
mosquitto_pub -h ${MQTT_BROKER} -t /foo -m "ofF"
get_log_lines
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
asyncio-mqtt
aiomqtt
asyncio-throttle
paho-mqtt
python-kasa
Expand Down

0 comments on commit 17096b0

Please sign in to comment.