diff --git a/proxy/Dockerfile b/proxy/Dockerfile index c98c7df..b41e5cb 100644 --- a/proxy/Dockerfile +++ b/proxy/Dockerfile @@ -1,7 +1,8 @@ -FROM python:3.10-alpine +FROM python:3.13-alpine WORKDIR /app COPY requirements.txt /app/requirements.txt RUN pip3 install -r requirements.txt COPY . . CMD ["python3", "server.py"] -EXPOSE 8675 +EXPOSE 8675 +EXPOSE 8685 diff --git a/proxy/requirements.txt b/proxy/requirements.txt index d230597..05ed63e 100644 --- a/proxy/requirements.txt +++ b/proxy/requirements.txt @@ -1,2 +1,6 @@ -pypowerwall==0.12.2 +pypowerwall==0.12.3 bs4==0.0.2 +beautifulsoup4==4.12.3 +requests==2.32.3 +protobuf==5.29.3 +teslapy==2.9.0 diff --git a/proxy/server.py b/proxy/server.py index afe1e00..b8b7161 100755 --- a/proxy/server.py +++ b/proxy/server.py @@ -15,16 +15,16 @@ Local Powerwall Mode The default mode for this proxy is to connect to a local Powerwall to pull data. This works with the Tesla Energy Gateway (TEG) for - Powerwall 1, 2 and +. It will also support pulling /vitals and /strings + Powerwall 1, 2 and +. It will also support pulling /vitals and /strings data if available. Set: PW_HOST to Powerwall Address and PW_PASSWORD to use this mode. Cloud Mode An optional mode is to connect to the Tesla Cloud to pull data. This requires that you have a Tesla Account and have registered your - Tesla Solar System or Powerwall with the Tesla App. It requires that - you run the setup 'python -m pypowerwall setup' process to create the - required API keys and tokens. This mode doesn't support /vitals or + Tesla Solar System or Powerwall with the Tesla App. It requires that + you run the setup 'python -m pypowerwall setup' process to create the + required API keys and tokens. This mode doesn't support /vitals or /strings data. Set: PW_EMAIL and leave PW_HOST blank to use this mode. @@ -44,768 +44,1024 @@ import signal import ssl import sys +import threading import time -from http.server import BaseHTTPRequestHandler, HTTPServer -from socketserver import ThreadingMixIn -from typing import Optional -from urllib.parse import urlparse, parse_qs +import unicodedata +from enum import StrEnum, auto +from http import HTTPStatus +from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer +from typing import Dict, Final, List, Set, Tuple +from urllib.parse import parse_qs, urlparse from transform import get_static, inject_js + import pypowerwall from pypowerwall import parse_version -BUILD = "t67" -ALLOWLIST = [ - '/api/status', '/api/site_info/site_name', '/api/meters/site', - '/api/meters/solar', '/api/sitemaster', '/api/powerwalls', - '/api/customer/registration', '/api/system_status', '/api/system_status/grid_status', - '/api/system/update/status', '/api/site_info', '/api/system_status/grid_faults', - '/api/operation', '/api/site_info/grid_codes', '/api/solars', '/api/solars/brands', - '/api/customer', '/api/meters', '/api/installer', '/api/networks', - '/api/system/networks', '/api/meters/readings', '/api/synchrometer/ct_voltage_references', - '/api/troubleshooting/problems', '/api/auth/toggle/supported', '/api/solar_powerwall', -] -DISABLED = [ +BUILD: Final[str] = "t67" +UTF_8: Final[str] = "utf-8" + +ALLOWLIST: Final[Set[str]] = set([ + '/api/auth/toggle/supported', + '/api/customer', '/api/customer/registration', -] -web_root = os.path.join(os.path.dirname(__file__), "web") + '/api/installer', + '/api/meters', + '/api/meters/readings', + '/api/meters/site', + '/api/meters/solar', + '/api/networks', + '/api/operation', + '/api/powerwalls', + '/api/site_info', + '/api/site_info/grid_codes', + '/api/site_info/site_name', + '/api/sitemaster', + '/api/solar_powerwall', + '/api/solars', + '/api/solars/brands', + '/api/status', + '/api/synchrometer/ct_voltage_references', + '/api/system_status', + '/api/system_status/grid_faults', + '/api/system_status/grid_status', + '/api/system/networks', + '/api/system/update/status', + '/api/troubleshooting/problems', +]) -# Configuration for Proxy - Check for environmental variables -# and always use those if available (required for Docker) -bind_address = os.getenv("PW_BIND_ADDRESS", "") -password = os.getenv("PW_PASSWORD", "") -email = os.getenv("PW_EMAIL", "email@example.com") -host = os.getenv("PW_HOST", "") -timezone = os.getenv("PW_TIMEZONE", "America/Los_Angeles") -debugmode = os.getenv("PW_DEBUG", "no").lower() == "yes" -cache_expire = int(os.getenv("PW_CACHE_EXPIRE", "5")) -browser_cache = int(os.getenv("PW_BROWSER_CACHE", "0")) -timeout = int(os.getenv("PW_TIMEOUT", "5")) -pool_maxsize = int(os.getenv("PW_POOL_MAXSIZE", "15")) -https_mode = os.getenv("PW_HTTPS", "no") -port = int(os.getenv("PW_PORT", "8675")) -style = os.getenv("PW_STYLE", "clear") + ".js" -siteid = os.getenv("PW_SITEID", None) -authpath = os.getenv("PW_AUTH_PATH", "") -authmode = os.getenv("PW_AUTH_MODE", "cookie") -cf = ".powerwall" -if authpath: - cf = os.path.join(authpath, ".powerwall") -cachefile = os.getenv("PW_CACHE_FILE", cf) -control_secret = os.getenv("PW_CONTROL_SECRET", "") -gw_pwd = os.getenv("PW_GW_PWD", None) -neg_solar = os.getenv("PW_NEG_SOLAR", "yes").lower() == "yes" - -# Global Stats -proxystats = { - 'pypowerwall': "%s Proxy %s" % (pypowerwall.version, BUILD), - 'mode': "Unknown", - 'gets': 0, - 'posts': 0, - 'errors': 0, - 'timeout': 0, - 'uri': {}, - 'ts': int(time.time()), - 'start': int(time.time()), - 'clear': int(time.time()), - 'uptime': "", - 'mem': 0, - 'site_name': "", - 'cloudmode': False, - 'fleetapi': False, - 'tedapi': False, - 'pw3': False, - 'tedapi_mode': "off", - 'siteid': None, - 'counter': 0, - 'cf': cachefile, - 'config': { - 'PW_BIND_ADDRESS': bind_address, - 'PW_PASSWORD': '*' * len(password) if password else None, - 'PW_EMAIL': email, - 'PW_HOST': host, - 'PW_TIMEZONE': timezone, - 'PW_DEBUG': debugmode, - 'PW_CACHE_EXPIRE': cache_expire, - 'PW_BROWSER_CACHE': browser_cache, - 'PW_TIMEOUT': timeout, - 'PW_POOL_MAXSIZE': pool_maxsize, - 'PW_HTTPS': https_mode, - 'PW_PORT': port, - 'PW_STYLE': style, - 'PW_SITEID': siteid, - 'PW_AUTH_PATH': authpath, - 'PW_AUTH_MODE': authmode, - 'PW_CACHE_FILE': cachefile, - 'PW_CONTROL_SECRET': '*' * len(control_secret) if control_secret else None, - 'PW_GW_PWD': '*' * len(gw_pwd) if gw_pwd else None, - 'PW_NEG_SOLAR': neg_solar - } -} - -if https_mode == "yes": - # run https mode with self-signed cert - cookiesuffix = "path=/;SameSite=None;Secure;" - httptype = "HTTPS" -elif https_mode == "http": - # run http mode but simulate https for proxy behind https proxy - cookiesuffix = "path=/;SameSite=None;Secure;" - httptype = "HTTP" -else: - # run in http mode - cookiesuffix = "path=/;" - httptype = "HTTP" - -# Logging +DISABLED: Final[Set[str]] = set([ + '/api/customer/registration', + '/networks' +]) +WEB_ROOT: Final[str] = os.path.join(os.path.dirname(__file__), "web") +SERVER_DEBUG: Final[bool] = bool(os.getenv("PW_DEBUG", "no").lower() == "yes") + +# Logging configuration log = logging.getLogger("proxy") logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.INFO) -log.setLevel(logging.INFO) - -if debugmode: - log.info("pyPowerwall [%s] Proxy Server [%s] - %s Port %d - DEBUG" % - (pypowerwall.version, BUILD, httptype, port)) - pypowerwall.set_debug(True) - log.setLevel(logging.DEBUG) -else: - log.info("pyPowerwall [%s] Proxy Server [%s] - %s Port %d" % - (pypowerwall.version, BUILD, httptype, port)) -log.info("pyPowerwall Proxy Started") - -# Check for cache expire time limit below 5s -if cache_expire < 5: - log.warning("Cache expiration set below 5s (PW_CACHE_EXPIRE=%d)" % cache_expire) +log.setLevel(logging.DEBUG if SERVER_DEBUG else logging.INFO) # Signal handler - Exit on SIGTERM # noinspection PyUnusedLocal def sig_term_handle(signum, frame): raise SystemExit -# Register signal handler signal.signal(signal.SIGTERM, sig_term_handle) +if SERVER_DEBUG: + pypowerwall.set_debug(True) + + +class CONFIG_TYPE(StrEnum): + """_summary_ + + Args: + StrEnum (_type_): _description_ + """ + PW_AUTH_MODE = auto() + PW_AUTH_PATH = auto() + PW_BIND_ADDRESS = auto() + PW_BROWSER_CACHE = auto() + PW_CACHE_EXPIRE = auto() + PW_CACHE_FILE = auto() + PW_CONTROL_SECRET = auto() + PW_COOKIE_SUFFIX = auto() + PW_EMAIL = auto() + PW_GW_PWD = auto() + PW_HOST = auto() + PW_HTTP_TYPE = auto() + PW_HTTPS = auto() + PW_NEG_SOLAR = auto() + PW_PASSWORD = auto() + PW_POOL_MAXSIZE = auto() + PW_PORT = auto() + PW_SITEID = auto() + PW_STYLE = auto() + PW_TIMEOUT = auto() + PW_TIMEZONE = auto() + + +class PROXY_STATS_TYPE(StrEnum): + """_summary_ + + Args: + StrEnum (_type_): _description_ + """ + AUTH_MODE = auto() + CF = auto() + CLEAR = auto() + CLOUDMODE = auto() + CONFIG = auto() + COUNTER = auto() + ERRORS = auto() + FLEETAPI = auto() + GETS = auto() + MEM = auto() + MODE = auto() + POSTS = auto() + PW3 = auto() + PYPOWERWALL = auto() + SITE_NAME = auto() + SITEID = auto() + START = auto() + TEDAPI = auto() + TEDAPI_MODE = auto() + TIMEOUT = auto() + TS = auto() + UPTIME = auto() + URI = auto() + +# Configuration for Proxy - Check for environmental variables +# and always use those if available (required for Docker) +# Configuration - Environment variables +type PROXY_CONFIG = Dict[CONFIG_TYPE, str | int | bool | None] +type PROXY_STATS = Dict[PROXY_STATS_TYPE, str | int | bool | None | PROXY_CONFIG | Dict[str, int]] # Get Value Function - Key to Value or Return Null def get_value(a, key): - if key in a: - return a[key] - else: - log.debug("Missing key in payload [%s]" % key) - return None + value = a.get(key) + if value is None: + log.debug(f"Missing key in payload [{key}]") + return value -# Connect to Powerwall -# TODO: Add support for multiple Powerwalls -try: - pw = pypowerwall.Powerwall(host, password, email, timezone, cache_expire, - timeout, pool_maxsize, siteid=siteid, - authpath=authpath, authmode=authmode, - cachefile=cachefile, auto_select=True, - retry_modes=True, gw_pwd=gw_pwd) -except Exception as e: - log.error(e) - log.error("Fatal Error: Unable to connect. Please fix config and restart.") - while True: - try: - time.sleep(5) # Infinite loop to keep container running - except (KeyboardInterrupt, SystemExit): - sys.exit(0) +def configure_pw_control(pw: pypowerwall.Powerwall, configuration: PROXY_CONFIG) -> pypowerwall.Powerwall: + if not configuration[CONFIG_TYPE.PW_CONTROL_SECRET]: + return None -site_name = pw.site_name() or "Unknown" -if pw.cloudmode or pw.fleetapi: - if pw.fleetapi: - proxystats['mode'] = "FleetAPI" - log.info("pyPowerwall Proxy Server - FleetAPI Mode") - else: - proxystats['mode'] = "Cloud" - log.info("pyPowerwall Proxy Server - Cloud Mode") - log.info("Connected to Site ID %s (%s)" % (pw.client.siteid, site_name.strip())) - if siteid is not None and siteid != str(pw.client.siteid): - log.info("Switch to Site ID %s" % siteid) - if not pw.client.change_site(siteid): - log.error("Fatal Error: Unable to connect. Please fix config and restart.") - while True: - try: - time.sleep(5) # Infinite loop to keep container running - except (KeyboardInterrupt, SystemExit): - sys.exit(0) -else: - proxystats['mode'] = "Local" - log.info("pyPowerwall Proxy Server - Local Mode") - log.info("Connected to Energy Gateway %s (%s)" % (host, site_name.strip())) - if pw.tedapi: - proxystats['tedapi'] = True - proxystats['tedapi_mode'] = pw.tedapi_mode - proxystats['pw3'] = pw.tedapi.pw3 - log.info(f"TEDAPI Mode Enabled for Device Vitals ({pw.tedapi_mode})") - -pw_control = None -if control_secret: log.info("Control Commands Activating - WARNING: Use with caution!") try: if pw.cloudmode or pw.fleetapi: pw_control = pw else: - pw_control = pypowerwall.Powerwall("", password, email, siteid=siteid, - authpath=authpath, authmode=authmode, - cachefile=cachefile, auto_select=True) + pw_control = pypowerwall.Powerwall( + "", + configuration[CONFIG_TYPE.PW_PASSWORD], + configuration[CONFIG_TYPE.PW_EMAIL], + siteid=configuration[CONFIG_TYPE.PW_SITEID], + authpath=configuration[CONFIG_TYPE.PW_AUTH_PATH], + authmode=configuration[CONFIG_TYPE.PW_AUTH_MODE], + cachefile=configuration[CONFIG_TYPE.PW_CACHE_FILE], + auto_select=True + ) except Exception as e: - log.error("Control Mode Failed: Unable to connect to cloud - Run Setup") - control_secret = "" + log.error(f"Control Mode Failed {e}: Unable to connect to cloud - Run Setup") + configuration[CONFIG_TYPE.PW_CONTROL_SECRET] = None + return None + if pw_control: log.info(f"Control Mode Enabled: Cloud Mode ({pw_control.mode}) Connected") else: log.error("Control Mode Failed: Unable to connect to cloud - Run Setup") - control_secret = None + configuration[CONFIG_TYPE.PW_CONTROL_SECRET] = None + return None -class ThreadingHTTPServer(ThreadingMixIn, HTTPServer): - daemon_threads = True + return pw_control -# pylint: disable=arguments-differ,global-variable-not-assigned -# noinspection PyPep8Naming class Handler(BaseHTTPRequestHandler): + def __init__(self, request, client_address, server, configuration: PROXY_CONFIG, pw: pypowerwall.Powerwall, pw_control: pypowerwall.Powerwall, proxy_stats: PROXY_STATS): + self.configuration = configuration + self.pw = pw + self.pw_control = pw_control + self.proxystats = proxy_stats + super().__init__(request, client_address, server) + def log_message(self, log_format, *args): - if debugmode: - log.debug("%s %s" % (self.address_string(), log_format % args)) - else: - pass + if SERVER_DEBUG: + log.debug(f"{self.address_string()} {log_format % args}") def address_string(self): # replace function to avoid lookup delays hostaddr, hostport = self.client_address[:2] return hostaddr + def send_json_response(self, data, status_code=HTTPStatus.OK, content_type='application/json') -> str: + response = json.dumps(data) + try: + self.send_response(status_code) + self.send_header('Content-type', content_type) + self.send_header('Content-Length', str(len(response))) + self.send_header("Access-Control-Allow-Origin", "*") + self.end_headers() + if self.wfile.write(response.encode(UTF_8)) > 0: + return response + except Exception as exc: + log.debug(f"Error sending response: {exc}") + self.proxystats[PROXY_STATS_TYPE.ERRORS] = int(self.proxystats[PROXY_STATS_TYPE.ERRORS]) + 1 + return response + + + def handle_control_post(self, self_path) -> bool: + """Handle control POST requests.""" + if not self.pw_control: + self.proxystats[PROXY_STATS_TYPE.ERRORS] += 1 + self.send_json_response( + {"error": "Control Commands Disabled - Set PW_CONTROL_SECRET to enable"}, + status_code=HTTPStatus.BAD_REQUEST + ) + return False + + try: + action = urlparse(self_path).path.split('/')[2] + content_length = int(self.headers.get('Content-Length', 0)) + post_data = self.rfile.read(content_length) + query_params = parse_qs(post_data.decode(UTF_8)) + value = query_params.get('value', [''])[0] + token = query_params.get('token', [''])[0] + except Exception as er: + log.error("Control Command Error: %s", er) + self.send_json_response( + {"error": "Control Command Error: Invalid Request"}, + status_code=HTTPStatus.BAD_REQUEST + ) + return False + + if token != self.configuration[CONFIG_TYPE.PW_CONTROL_SECRET]: + self.send_json_response( + {"unauthorized": "Control Command Token Invalid"}, + status_code=HTTPStatus.UNAUTHORIZED + ) + return False + + if action == 'reserve': + if not value: + self.send_json_response({"reserve": self.pw_control.get_reserve()}) + return True + elif value.isdigit(): + result = self.pw_control.set_reserve(int(value)) + log.info(f"Control Command: Set Reserve to {value}") + self.send_json_response(result) + return True + else: + self.send_json_response( + {"error": "Control Command Value Invalid"}, + status_code=HTTPStatus.BAD_REQUEST + ) + elif action == 'mode': + if not value: + self.send_json_response({"mode": self.pw_control.get_mode()}) + return True + elif value in ['self_consumption', 'backup', 'autonomous']: + result = self.pw_control.set_mode(value) + log.info(f"Control Command: Set Mode to {value}") + self.send_json_response(result) + return True + else: + self.send_json_response( + {"error": "Control Command Value Invalid"}, + status_code=HTTPStatus.BAD_REQUEST + ) + else: + self.send_json_response( + {"error": "Invalid Command Action"}, + status_code=HTTPStatus.BAD_REQUEST + ) + return False + + def do_POST(self): - global proxystats - contenttype = 'application/json' - message = '{"error": "Invalid Request"}' + """Handle POST requests.""" if self.path.startswith('/control'): - # curl -X POST -d "value=20&token=1234" http://localhost:8675/control/reserve - # curl -X POST -d "value=backup&token=1234" http://localhost:8675/control/mode - message = None - if not control_secret: - message = '{"error": "Control Commands Disabled - Set PW_CONTROL_SECRET to enable"}' - else: - try: - action = urlparse(self.path).path.split('/')[2] - post_data = self.rfile.read(int(self.headers['Content-Length'])) - query_params = parse_qs(post_data.decode('utf-8')) - value = query_params.get('value', [''])[0] - token = query_params.get('token', [''])[0] - except Exception as er: - message = '{"error": "Control Command Error: Invalid Request"}' - log.error(f"Control Command Error: {er}") - if not message: - # Check if unable to connect to cloud - if pw_control.client is None: - message = '{"error": "Control Command Error: Unable to connect to cloud mode - Run Setup"}' - log.error("Control Command Error: Unable to connect to cloud mode - Run Setup") - else: - if token == control_secret: - if action == 'reserve': - # ensure value is an integer - if not value: - # return current reserve level in json string - message = '{"reserve": %s}' % pw_control.get_reserve() - elif value.isdigit(): - message = json.dumps(pw_control.set_reserve(int(value))) - log.info(f"Control Command: Set Reserve to {value}") - else: - message = '{"error": "Control Command Value Invalid"}' - elif action == 'mode': - if not value: - # return current mode in json string - message = '{"mode": "%s"}' % pw_control.get_mode() - elif value in ['self_consumption', 'backup', 'autonomous']: - message = json.dumps(pw_control.set_mode(value)) - log.info(f"Control Command: Set Mode to {value}") - else: - message = '{"error": "Control Command Value Invalid"}' - else: - message = '{"error": "Invalid Command Action"}' - else: - message = '{"unauthorized": "Control Command Token Invalid"}' - if "error" in message: - self.send_response(400) - proxystats['errors'] = proxystats['errors'] + 1 - elif "unauthorized" in message: - self.send_response(401) + stat = PROXY_STATS_TYPE.POSTS if self.handle_control_post(self.path) else PROXY_STATS_TYPE.ERRORS + self.proxystats[stat] += 1 else: - self.send_response(200) - proxystats['posts'] = proxystats['posts'] + 1 - self.send_header('Content-type', contenttype) - self.send_header('Content-Length', str(len(message))) - self.send_header("Access-Control-Allow-Origin", "*") - self.end_headers() - self.wfile.write(message.encode("utf8")) + self.send_json_response( + {"error": "Invalid Request"}, + status_code=HTTPStatus.BAD_REQUEST + ) + def do_GET(self): - global proxystats - self.send_response(200) - contenttype = 'application/json' - - if self.path == '/aggregates' or self.path == '/api/meters/aggregates': - # Meters - JSON - aggregates = pw.poll('/api/meters/aggregates') - if not neg_solar and aggregates and 'solar' in aggregates: - solar = aggregates['solar'] - if solar and 'instant_power' in solar and solar['instant_power'] < 0: - solar['instant_power'] = 0 - # Shift energy from solar to load - if 'load' in aggregates and 'instant_power' in aggregates['load']: - aggregates['load']['instant_power'] -= solar['instant_power'] - try: - message = json.dumps(aggregates) - except: - log.error(f"JSON encoding error in payload: {aggregates}") - message = None - elif self.path == '/soe': - # Battery Level - JSON - message: str = pw.poll('/api/system_status/soe', jsonformat=True) - elif self.path == '/api/system_status/soe': - # Force 95% Scale - level = pw.level(scale=True) - message: str = json.dumps({"percentage": level}) - elif self.path == '/api/system_status/grid_status': - # Grid Status - JSON - message: str = pw.poll('/api/system_status/grid_status', jsonformat=True) - elif self.path == '/csv': - # Grid,Home,Solar,Battery,Level - CSV - contenttype = 'text/plain; charset=utf-8' - batterylevel = pw.level() or 0 - grid = pw.grid() or 0 - solar = pw.solar() or 0 - battery = pw.battery() or 0 - home = pw.home() or 0 - if not neg_solar and solar < 0: - solar = 0 - # Shift energy from solar to load - home -= solar - message = "%0.2f,%0.2f,%0.2f,%0.2f,%0.2f\n" \ - % (grid, home, solar, battery, batterylevel) - elif self.path == '/vitals': - # Vitals Data - JSON - message: str = pw.vitals(jsonformat=True) or json.dumps({}) - elif self.path == '/strings': - # Strings Data - JSON - message: str = pw.strings(jsonformat=True) or json.dumps({}) - elif self.path == '/stats': - # Give Internal Stats - proxystats['ts'] = int(time.time()) - delta = proxystats['ts'] - proxystats['start'] - proxystats['uptime'] = str(datetime.timedelta(seconds=delta)) - proxystats['mem'] = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss - proxystats['site_name'] = pw.site_name() - proxystats['cloudmode'] = pw.cloudmode - proxystats['fleetapi'] = pw.fleetapi - if (pw.cloudmode or pw.fleetapi) and pw.client is not None: - proxystats['siteid'] = pw.client.siteid - proxystats['counter'] = pw.client.counter - message: str = json.dumps(proxystats) - elif self.path == '/stats/clear': - # Clear Internal Stats - log.debug("Clear internal stats") - proxystats['gets'] = 0 - proxystats['errors'] = 0 - proxystats['uri'] = {} - proxystats['clear'] = int(time.time()) - message: str = json.dumps(proxystats) - elif self.path == '/temps': - # Temps of Powerwalls - message: str = pw.temps(jsonformat=True) or json.dumps({}) - elif self.path == '/temps/pw': - # Temps of Powerwalls with Simple Keys - pwtemp = {} - idx = 1 - temps = pw.temps() - for i in temps: - key = "PW%d_temp" % idx - pwtemp[key] = temps[i] - idx = idx + 1 - message: str = json.dumps(pwtemp) - elif self.path == '/alerts': - # Alerts - message: str = pw.alerts(jsonformat=True) or json.dumps([]) - elif self.path == '/alerts/pw': - # Alerts in dictionary/object format - pwalerts = {} - alerts = pw.alerts() - if alerts is None: - message: Optional[str] = None - else: - for alert in alerts: - pwalerts[alert] = 1 - message: str = json.dumps(pwalerts) or json.dumps({}) - elif self.path == '/freq': - # Frequency, Current, Voltage and Grid Status - fcv = {} - idx = 1 - # Pull freq, current, voltage of each Powerwall via system_status - d = pw.system_status() or {} - if "battery_blocks" in d: - for block in d["battery_blocks"]: - fcv["PW%d_name" % idx] = None # Placeholder for vitals - fcv["PW%d_PINV_Fout" % idx] = get_value(block, "f_out") - fcv["PW%d_PINV_VSplit1" % idx] = None # Placeholder for vitals - fcv["PW%d_PINV_VSplit2" % idx] = None # Placeholder for vitals - fcv["PW%d_PackagePartNumber" % idx] = get_value(block, "PackagePartNumber") - fcv["PW%d_PackageSerialNumber" % idx] = get_value(block, "PackageSerialNumber") - fcv["PW%d_p_out" % idx] = get_value(block, "p_out") - fcv["PW%d_q_out" % idx] = get_value(block, "q_out") - fcv["PW%d_v_out" % idx] = get_value(block, "v_out") - fcv["PW%d_f_out" % idx] = get_value(block, "f_out") - fcv["PW%d_i_out" % idx] = get_value(block, "i_out") - idx = idx + 1 - # Pull freq, current, voltage of each Powerwall via vitals if available - vitals = pw.vitals() or {} - idx = 1 - for device in vitals: - d = vitals[device] - if device.startswith('TEPINV'): - # PW freq - fcv["PW%d_name" % idx] = device - fcv["PW%d_PINV_Fout" % idx] = get_value(d, 'PINV_Fout') - fcv["PW%d_PINV_VSplit1" % idx] = get_value(d, 'PINV_VSplit1') - fcv["PW%d_PINV_VSplit2" % idx] = get_value(d, 'PINV_VSplit2') - idx = idx + 1 - if device.startswith('TESYNC') or device.startswith('TEMSA'): - # Island and Meter Metrics from Backup Gateway or Backup Switch - for i in d: - if i.startswith('ISLAND') or i.startswith('METER'): - fcv[i] = d[i] - fcv["grid_status"] = pw.grid_status(type="numeric") - message: str = json.dumps(fcv) - elif self.path == '/pod': - # Powerwall Battery Data - pod = {} - # Get Individual Powerwall Battery Data - d = pw.system_status() or {} - if "battery_blocks" in d: - idx = 1 - for block in d["battery_blocks"]: - # Vital Placeholders - pod["PW%d_name" % idx] = None - pod["PW%d_POD_ActiveHeating" % idx] = None - pod["PW%d_POD_ChargeComplete" % idx] = None - pod["PW%d_POD_ChargeRequest" % idx] = None - pod["PW%d_POD_DischargeComplete" % idx] = None - pod["PW%d_POD_PermanentlyFaulted" % idx] = None - pod["PW%d_POD_PersistentlyFaulted" % idx] = None - pod["PW%d_POD_enable_line" % idx] = None - pod["PW%d_POD_available_charge_power" % idx] = None - pod["PW%d_POD_available_dischg_power" % idx] = None - pod["PW%d_POD_nom_energy_remaining" % idx] = None - pod["PW%d_POD_nom_energy_to_be_charged" % idx] = None - pod["PW%d_POD_nom_full_pack_energy" % idx] = None - # Additional System Status Data - pod["PW%d_POD_nom_energy_remaining" % idx] = get_value(block, "nominal_energy_remaining") # map - pod["PW%d_POD_nom_full_pack_energy" % idx] = get_value(block, "nominal_full_pack_energy") # map - pod["PW%d_PackagePartNumber" % idx] = get_value(block, "PackagePartNumber") - pod["PW%d_PackageSerialNumber" % idx] = get_value(block, "PackageSerialNumber") - pod["PW%d_pinv_state" % idx] = get_value(block, "pinv_state") - pod["PW%d_pinv_grid_state" % idx] = get_value(block, "pinv_grid_state") - pod["PW%d_p_out" % idx] = get_value(block, "p_out") - pod["PW%d_q_out" % idx] = get_value(block, "q_out") - pod["PW%d_v_out" % idx] = get_value(block, "v_out") - pod["PW%d_f_out" % idx] = get_value(block, "f_out") - pod["PW%d_i_out" % idx] = get_value(block, "i_out") - pod["PW%d_energy_charged" % idx] = get_value(block, "energy_charged") - pod["PW%d_energy_discharged" % idx] = get_value(block, "energy_discharged") - pod["PW%d_off_grid" % idx] = int(get_value(block, "off_grid") or 0) - pod["PW%d_vf_mode" % idx] = int(get_value(block, "vf_mode") or 0) - pod["PW%d_wobble_detected" % idx] = int(get_value(block, "wobble_detected") or 0) - pod["PW%d_charge_power_clamped" % idx] = int(get_value(block, "charge_power_clamped") or 0) - pod["PW%d_backup_ready" % idx] = int(get_value(block, "backup_ready") or 0) - pod["PW%d_OpSeqState" % idx] = get_value(block, "OpSeqState") - pod["PW%d_version" % idx] = get_value(block, "version") - idx = idx + 1 - # Augment with Vitals Data if available - vitals = pw.vitals() or {} - idx = 1 - for device in vitals: - v = vitals[device] - if device.startswith('TEPOD'): - pod["PW%d_name" % idx] = device - pod["PW%d_POD_ActiveHeating" % idx] = int(get_value(v, 'POD_ActiveHeating') or 0) - pod["PW%d_POD_ChargeComplete" % idx] = int(get_value(v, 'POD_ChargeComplete') or 0) - pod["PW%d_POD_ChargeRequest" % idx] = int(get_value(v, 'POD_ChargeRequest') or 0) - pod["PW%d_POD_DischargeComplete" % idx] = int(get_value(v, 'POD_DischargeComplete') or 0) - pod["PW%d_POD_PermanentlyFaulted" % idx] = int(get_value(v, 'POD_PermanentlyFaulted') or 0) - pod["PW%d_POD_PersistentlyFaulted" % idx] = int(get_value(v, 'POD_PersistentlyFaulted') or 0) - pod["PW%d_POD_enable_line" % idx] = int(get_value(v, 'POD_enable_line') or 0) - pod["PW%d_POD_available_charge_power" % idx] = get_value(v, 'POD_available_charge_power') - pod["PW%d_POD_available_dischg_power" % idx] = get_value(v, 'POD_available_dischg_power') - pod["PW%d_POD_nom_energy_remaining" % idx] = get_value(v, 'POD_nom_energy_remaining') - pod["PW%d_POD_nom_energy_to_be_charged" % idx] = get_value(v, 'POD_nom_energy_to_be_charged') - pod["PW%d_POD_nom_full_pack_energy" % idx] = get_value(v, 'POD_nom_full_pack_energy') - idx = idx + 1 - # Aggregate data - pod["nominal_full_pack_energy"] = get_value(d, 'nominal_full_pack_energy') - pod["nominal_energy_remaining"] = get_value(d, 'nominal_energy_remaining') - pod["time_remaining_hours"] = pw.get_time_remaining() - pod["backup_reserve_percent"] = pw.get_reserve() - message: str = json.dumps(pod) - elif self.path == '/version': - # Firmware Version - version = pw.version() - v = {} - if version is None: - v["version"] = "SolarOnly" - v["vint"] = 0 - message: str = json.dumps(v) - else: - v["version"] = version - v["vint"] = parse_version(version) - message: str = json.dumps(v) - elif self.path == '/help': - # Display friendly help screen link and stats - proxystats['ts'] = int(time.time()) - delta = proxystats['ts'] - proxystats['start'] - proxystats['uptime'] = str(datetime.timedelta(seconds=delta)) - proxystats['mem'] = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss - proxystats['site_name'] = pw.site_name() - proxystats['cloudmode'] = pw.cloudmode - proxystats['fleetapi'] = pw.fleetapi - if (pw.cloudmode or pw.fleetapi) and pw.client is not None: - proxystats['siteid'] = pw.client.siteid - proxystats['counter'] = pw.client.counter - proxystats['authmode'] = pw.authmode - contenttype = 'text/html' - message: str = """ - \n
\n - \n - \n - \n\nStat | Value | ||
---|---|---|---|
{i} | {proxystats[i]} | ||
URI: {i} | {proxystats["uri"][i]} | ||
Config: | -
-
-
- Click to view-
|
-
Page refresh: {str(datetime.datetime.fromtimestamp(time.time()))}
\n\n' - elif self.path == '/api/troubleshooting/problems': - # Simulate old API call and respond with empty list - message = '{"problems": []}' - # message = pw.poll('/api/troubleshooting/problems') or '{"problems": []}' - elif self.path.startswith('/tedapi'): - # TEDAPI Specific Calls - if pw.tedapi: - message = '{"error": "Use /tedapi/config, /tedapi/status, /tedapi/components, /tedapi/battery, /tedapi/controller"}' - if self.path == '/tedapi/config': - message = json.dumps(pw.tedapi.get_config()) - if self.path == '/tedapi/status': - message = json.dumps(pw.tedapi.get_status()) - if self.path == '/tedapi/components': - message = json.dumps(pw.tedapi.get_components()) - if self.path == '/tedapi/battery': - message = json.dumps(pw.tedapi.get_battery_blocks()) - if self.path == '/tedapi/controller': - message = json.dumps(pw.tedapi.get_device_controller()) - else: - message = '{"error": "TEDAPI not enabled"}' - elif self.path.startswith('/cloud'): - # Cloud API Specific Calls - if pw.cloudmode and not pw.fleetapi: - message = '{"error": "Use /cloud/battery, /cloud/power, /cloud/config"}' - if self.path == '/cloud/battery': - message = json.dumps(pw.client.get_battery()) - if self.path == '/cloud/power': - message = json.dumps(pw.client.get_site_power()) - if self.path == '/cloud/config': - message = json.dumps(pw.client.get_site_config()) - else: - message = '{"error": "Cloud API not enabled"}' - elif self.path.startswith('/fleetapi'): - # FleetAPI Specific Calls - if pw.fleetapi: - message = '{"error": "Use /fleetapi/info, /fleetapi/status"}' - if self.path == '/fleetapi/info': - message = json.dumps(pw.client.get_site_info()) - if self.path == '/fleetapi/status': - message = json.dumps(pw.client.get_live_status()) - else: - message = '{"error": "FleetAPI not enabled"}' - elif self.path in DISABLED: - # Disabled API Calls - message = '{"status": "404 Response - API Disabled"}' - elif self.path in ALLOWLIST: - # Allowed API Calls - Proxy to Powerwall - message: str = pw.poll(self.path, jsonformat=True) - elif self.path.startswith('/control/reserve'): + """Handle GET requests.""" + path = self.path + + # Map paths to handler functions + path_handlers = { + '/aggregates': self.handle_aggregates, + '/api/meters/aggregates': self.handle_aggregates, + '/soe': self.handle_soe, + '/api/system_status/soe': self.handle_soe_scaled, + '/api/system_status/grid_status': self.handle_grid_status, + '/csv': self.handle_csv, + '/vitals': self.handle_vitals, + '/strings': self.handle_strings, + '/stats': self.handle_stats, + '/stats/clear': self.handle_stats_clear, + '/temps': self.handle_temps, + '/temps/pw': self.handle_temps_pw, + '/alerts': self.handle_alerts, + '/alerts/pw': self.handle_alerts_pw, + '/freq': self.handle_freq, + '/pod': self.handle_pod, + '/version': self.handle_version, + '/help': self.handle_help, + '/api/troubleshooting/problems': self.handle_problems, + } + + result: str = "" + if path in path_handlers: + result = path_handlers[path]() + elif path in DISABLED: + result = self.send_json_response( + {"status": "404 Response - API Disabled"} + ) + elif path in ALLOWLIST: + result = self.handle_allowlist(path) + elif path.startswith('/tedapi'): + result = self.handle_tedapi(path) + elif path.startswith('/cloud'): + result = self.handle_cloud(path) + elif path.startswith('/fleetapi'): + result = self.handle_fleetapi(path) + elif path.startswith('/control/reserve'): # Current battery reserve level - if not pw_control: + if not self.pw_control: message = '{"error": "Control Commands Disabled - Set PW_CONTROL_SECRET to enable"}' else: - message = '{"reserve": %s}' % pw_control.get_reserve() - elif self.path.startswith('/control/mode'): + message = '{"reserve": %s}' % self.pw_control.get_reserve() + result = self.send_json_response(json.loads(message)) + elif path.startswith('/control/mode'): # Current operating mode - if not pw_control: + if not self.pw_control: message = '{"error": "Control Commands Disabled - Set PW_CONTROL_SECRET to enable"}' else: - message = '{"mode": "%s"}' % pw_control.get_mode() + message = '{"mode": "%s"}' % self.pw_control.get_mode() + result = self.send_json_response(json.loads(message)) else: - # Everything else - Set auth headers required for web application - proxystats['gets'] = proxystats['gets'] + 1 - if pw.authmode == "token": - # Create bogus cookies - self.send_header("Set-Cookie", f"AuthCookie=1234567890;{cookiesuffix}") - self.send_header("Set-Cookie", f"UserRecord=1234567890;{cookiesuffix}") - else: - self.send_header("Set-Cookie", f"AuthCookie={pw.client.auth['AuthCookie']};{cookiesuffix}") - self.send_header("Set-Cookie", f"UserRecord={pw.client.auth['UserRecord']};{cookiesuffix}") - - # Serve static assets from web root first, if found. - # pylint: disable=attribute-defined-outside-init - if self.path == "/" or self.path == "": - self.path = "/index.html" - fcontent, ftype = get_static(web_root, self.path) - # Replace {VARS} with current data - status = pw.status() - # convert fcontent to string - fcontent = fcontent.decode("utf-8") - # fix the following variables that if they are None, return "" - fcontent = fcontent.replace("{VERSION}", status["version"] or "") - fcontent = fcontent.replace("{HASH}", status["git_hash"] or "") - fcontent = fcontent.replace("{EMAIL}", email) - fcontent = fcontent.replace("{STYLE}", style) - # convert fcontent back to bytes - fcontent = bytes(fcontent, 'utf-8') - else: - fcontent, ftype = get_static(web_root, self.path) - if fcontent: - log.debug("Served from local web root: {} type {}".format(self.path, ftype)) - # If not found, serve from Powerwall web server - elif pw.cloudmode or pw.fleetapi: - log.debug("Cloud Mode - File not found: {}".format(self.path)) - fcontent = bytes("Not Found", 'utf-8') - ftype = "text/plain" - else: - # Proxy request to Powerwall web server. - proxy_path = self.path - if proxy_path.startswith("/"): - proxy_path = proxy_path[1:] - pw_url = "https://{}/{}".format(pw.host, proxy_path) - log.debug("Proxy request to: {}".format(pw_url)) - try: - if pw.authmode == "token": - r = pw.client.session.get( - url=pw_url, - headers=pw.auth, - verify=False, - stream=True, - timeout=pw.timeout - ) - else: - r = pw.client.session.get( - url=pw_url, - cookies=pw.auth, - verify=False, - stream=True, - timeout=pw.timeout - ) - fcontent = r.content - ftype = r.headers['content-type'] - except AttributeError: - # Display 404 - log.debug("File not found: {}".format(self.path)) - fcontent = bytes("Not Found", 'utf-8') - ftype = "text/plain" - - # Allow browser caching, if user permits, only for CSS, JavaScript and PNG images... - if browser_cache > 0 and (ftype == 'text/css' or ftype == 'application/javascript' or ftype == 'image/png'): - self.send_header("Cache-Control", "max-age={}".format(browser_cache)) + result = self.handle_static_content(path) + + if result is None or result == "": + self.proxystats[PROXY_STATS_TYPE.TIMEOUT] += 1 + elif result == "ERROR!": + self.proxystats[PROXY_STATS_TYPE.ERRORS] += 1 + else: + self.proxystats[PROXY_STATS_TYPE.GETS] += 1 + if path in self.proxystats[PROXY_STATS_TYPE.URI]: + self.proxystats[PROXY_STATS_TYPE.URI][path] += 1 else: - self.send_header("Cache-Control", "no-cache, no-store") + self.proxystats[PROXY_STATS_TYPE.URI][path] = 1 - # Inject transformations - if self.path.split('?')[0] == "/": - if os.path.exists(os.path.join(web_root, style)): - fcontent = bytes(inject_js(fcontent, style), 'utf-8') - self.send_header('Content-type', '{}'.format(ftype)) - self.end_headers() + def handle_aggregates(self) -> str: + # Meters - JSON + aggregates = self.pw.poll('/api/meters/aggregates') + if not self.configuration[CONFIG_TYPE.PW_NEG_SOLAR] and aggregates and 'solar' in aggregates: + solar = aggregates['solar'] + if solar and 'instant_power' in solar and solar['instant_power'] < 0: + solar['instant_power'] = 0 + # Shift energy from solar to load + if 'load' in aggregates and 'instant_power' in aggregates['load']: + aggregates['load']['instant_power'] -= solar['instant_power'] + return self.send_json_response(aggregates) + + + def handle_soe(self) -> str: + soe = self.pw.poll('/api/system_status/soe', jsonformat=True) + return self.send_json_response(json.loads(soe)) + + + def handle_soe_scaled(self) -> str: + level = self.pw.level(scale=True) + return self.send_json_response({"percentage": level}) + + + def handle_grid_status(self) -> str: + grid_status = self.pw.poll('/api/system_status/grid_status', jsonformat=True) + return self.send_json_response(json.loads(grid_status)) + + + def handle_csv(self) -> str: + # Grid,Home,Solar,Battery,Level - CSV + batterylevel = self.pw.level() or 0 + grid = self.pw.grid() or 0 + solar = self.pw.solar() or 0 + battery = self.pw.battery() or 0 + home = self.pw.home() or 0 + if not self.configuration[CONFIG_TYPE.PW_NEG_SOLAR] and solar < 0: + solar = 0 + # Shift energy from solar to load + home -= solar + message = f"{grid:.2f},{home:.2f},{solar:.2f},{battery:.2f},{batterylevel:.2f}\n" + return self.send_json_response(message) + + + def handle_vitals(self) -> str: + vitals = self.pw.vitals(jsonformat=True) or {} + return self.send_json_response(json.loads(vitals)) + + + def handle_strings(self) -> str: + strings = self.pw.strings(jsonformat=True) or {} + din_suffix = self.pw.din()[-2:] + output = {f"{key}_{din_suffix}":value for key, value in json.loads(strings).items()} + return self.send_json_response(output) + + + def handle_stats(self) -> str: + self.proxystats.update({ + PROXY_STATS_TYPE.TS: int(time.time()), + PROXY_STATS_TYPE.UPTIME: str(datetime.timedelta(seconds=(float(self.proxystats[PROXY_STATS_TYPE.TS]) - float(self.proxystats[PROXY_STATS_TYPE.START])))), + PROXY_STATS_TYPE.MEM: resource.getrusage(resource.RUSAGE_SELF).ru_maxrss, + PROXY_STATS_TYPE.SITE_NAME: self.pw.site_name(), + PROXY_STATS_TYPE.CLOUDMODE: self.pw.cloudmode, + PROXY_STATS_TYPE.FLEETAPI: self.pw.fleetapi, + PROXY_STATS_TYPE.AUTH_MODE: self.pw.authmode + }) + if (self.pw.cloudmode or self.pw.fleetapi) and self.pw.client: + self.proxystats[PROXY_STATS_TYPE.SITEID] = self.pw.client.siteid + self.proxystats[PROXY_STATS_TYPE.COUNTER] = self.pw.client.counter + return self.send_json_response(self.proxystats) + + + def handle_stats_clear(self) -> str: + # Clear Internal Stats + log.debug("Clear internal stats") + self.proxystats.update({ + PROXY_STATS_TYPE.GETS: 0, + PROXY_STATS_TYPE.ERRORS: 0, + PROXY_STATS_TYPE.URI: {}, + PROXY_STATS_TYPE.CLEAR: int(time.time()), + }) + return self.send_json_response(self.proxystats) + + + def handle_temps(self) -> str: + temps = self.pw.temps(jsonformat=True) or {} + return self.send_json_response(json.loads(temps)) + + + def handle_temps_pw(self) -> str: + temps = self.pw.temps() or {} + pw_temp = {f"PW{idx}_temp": temp for idx, temp in enumerate(temps.values(), 1)} + return self.send_json_response(pw_temp) + + + def handle_alerts(self) -> str: + alerts = self.pw.alerts(jsonformat=True) or [] + return self.send_json_response(alerts) + + + def handle_alerts_pw(self) -> str: + alerts = self.pw.alerts() or [] + pw_alerts = {alert: 1 for alert in alerts} + return self.send_json_response(pw_alerts) + + + def handle_freq(self) -> str: + fcv = {} + system_status = self.pw.system_status() or {} + blocks = system_status.get("battery_blocks", []) + for idx, block in enumerate(blocks, 1): + fcv.update({ + f"PW{idx}_name": None, + f"PW{idx}_PINV_Fout": get_value(block, "f_out"), + f"PW{idx}_PINV_VSplit1": None, + f"PW{idx}_PINV_VSplit2": None, + f"PW{idx}_PackagePartNumber": get_value(block, "PackagePartNumber"), + f"PW{idx}_PackageSerialNumber": get_value(block, "PackageSerialNumber"), + f"PW{idx}_p_out": get_value(block, "p_out"), + f"PW{idx}_q_out": get_value(block, "q_out"), + f"PW{idx}_v_out": get_value(block, "v_out"), + f"PW{idx}_f_out": get_value(block, "f_out"), + f"PW{idx}_i_out": get_value(block, "i_out"), + }) + vitals = self.pw.vitals() or {} + for idx, (device, data) in enumerate(vitals.items()): + if device.startswith('TEPINV'): + fcv.update({ + f"PW{idx}_name": device, + f"PW{idx}_PINV_Fout": get_value(data, 'PINV_Fout'), + f"PW{idx}_PINV_VSplit1": get_value(data, 'PINV_VSplit1'), + f"PW{idx}_PINV_VSplit2": get_value(data, 'PINV_VSplit2') + }) + if device.startswith(('TESYNC', 'TEMSA')): + fcv.update({key: value for key, value in data.items() if key.startswith(('ISLAND', 'METER'))}) + fcv["grid_status"] = self.pw.grid_status(type="numeric") + return self.send_json_response(fcv) + + + def handle_pod(self) -> str: + # Powerwall Battery Data + pod = {} + # Get Individual Powerwall Battery Data + system_status = self.pw.system_status() or {} + blocks = system_status.get("battery_blocks", []) + for idx, block in enumerate(blocks, 1): + pod.update({ + # Vital Placeholders + f"PW{idx}_name": None, + f"PW{idx}_POD_ActiveHeating": None, + f"PW{idx}_POD_ChargeComplete": None, + f"PW{idx}_POD_ChargeRequest": None, + f"PW{idx}_POD_DischargeComplete": None, + f"PW{idx}_POD_PermanentlyFaulted": None, + f"PW{idx}_POD_PersistentlyFaulted": None, + f"PW{idx}_POD_enable_line": None, + f"PW{idx}_POD_available_charge_power": None, + f"PW{idx}_POD_available_dischg_power": None, + f"PW{idx}_POD_nom_energy_remaining": None, + f"PW{idx}_POD_nom_energy_to_be_charged": None, + f"PW{idx}_POD_nom_full_pack_energy": None, + # Additional System Status Data + f"PW{idx}_POD_nom_energy_remaining": get_value(block, "nominal_energy_remaining"), # map + f"PW{idx}_POD_nom_full_pack_energy": get_value(block, "nominal_full_pack_energy"), # map + f"PW{idx}_PackagePartNumber": get_value(block, "PackagePartNumber"), + f"PW{idx}_PackageSerialNumber": get_value(block, "PackageSerialNumber"), + f"PW{idx}_pinv_state": get_value(block, "pinv_state"), + f"PW{idx}_pinv_grid_state": get_value(block, "pinv_grid_state"), + f"PW{idx}_p_out": get_value(block, "p_out"), + f"PW{idx}_q_out": get_value(block, "q_out"), + f"PW{idx}_v_out": get_value(block, "v_out"), + f"PW{idx}_f_out": get_value(block, "f_out"), + f"PW{idx}_i_out": get_value(block, "i_out"), + f"PW{idx}_energy_charged": get_value(block, "energy_charged"), + f"PW{idx}_energy_discharged": get_value(block, "energy_discharged"), + f"PW{idx}_off_grid": int(get_value(block, "off_grid") or 0), + f"PW{idx}_vf_mode": int(get_value(block, "vf_mode") or 0), + f"PW{idx}_wobble_detected": int(get_value(block, "wobble_detected") or 0), + f"PW{idx}_charge_power_clamped": int(get_value(block, "charge_power_clamped") or 0), + f"PW{idx}_backup_ready": int(get_value(block, "backup_ready") or 0), + f"PW{idx}_OpSeqState": get_value(block, "OpSeqState"), + f"PW{idx}_version": get_value(block, "version") + }) + + vitals = self.pw.vitals() or {} + for idx, (device, data) in enumerate(vitals.items(), 1): + if not device.startswith('TEPOD'): + continue + pod.update({ + f"PW{idx}_name": device, + f"PW{idx}_POD_ActiveHeating": int(get_value(data, 'POD_ActiveHeating') or 0), + f"PW{idx}_POD_ChargeComplete": int(get_value(data, 'POD_ChargeComplete') or 0), + f"PW{idx}_POD_ChargeRequest": int(get_value(data, 'POD_ChargeRequest') or 0), + f"PW{idx}_POD_DischargeComplete": int(get_value(data, 'POD_DischargeComplete') or 0), + f"PW{idx}_POD_PermanentlyFaulted": int(get_value(data, 'POD_PermanentlyFaulted') or 0), + f"PW{idx}_POD_PersistentlyFaulted": int(get_value(data, 'POD_PersistentlyFaulted') or 0), + f"PW{idx}_POD_enable_line": int(get_value(data, 'POD_enable_line') or 0), + f"PW{idx}_POD_available_charge_power": get_value(data, 'POD_available_charge_power'), + f"PW{idx}_POD_available_dischg_power": get_value(data, 'POD_available_dischg_power'), + f"PW{idx}_POD_nom_energy_remaining": get_value(data, 'POD_nom_energy_remaining'), + f"PW{idx}_POD_nom_energy_to_be_charged": get_value(data, 'POD_nom_energy_to_be_charged'), + f"PW{idx}_POD_nom_full_pack_energy": get_value(data, 'POD_nom_full_pack_energy') + }) + + pod.update({ + "nominal_full_pack_energy": get_value(system_status, 'nominal_full_pack_energy'), + "nominal_energy_remaining": get_value(system_status, 'nominal_energy_remaining'), + "time_remaining_hours": self.pw.get_time_remaining(), + "backup_reserve_percent": self.pw.get_reserve() + }) + return self.send_json_response(pod) + + + def handle_version(self) -> str: + version = self.pw.version() + r = {"version": "SolarOnly", "vint": 0} if version is None else {"version": version, "vint": parse_version(version)} + return self.send_json_response(r) + + + def handle_help(self) -> str: + self.send_response(HTTPStatus.OK) + self.send_header('Content-type', 'text/html') + self.end_headers() + # Display friendly help screen link and stats + + self.proxystats.update({ + PROXY_STATS_TYPE.TS: int(time.time()), + PROXY_STATS_TYPE.UPTIME: str(datetime.timedelta(seconds=int(time.time()) - self.proxystats[PROXY_STATS_TYPE.START])), + PROXY_STATS_TYPE.MEM: resource.getrusage(resource.RUSAGE_SELF).ru_maxrss, + PROXY_STATS_TYPE.SITE_NAME: self.pw.site_name(), + PROXY_STATS_TYPE.CLOUDMODE: self.pw.cloudmode, + PROXY_STATS_TYPE.FLEETAPI: self.pw.fleetapi, + }) + + if (self.pw.cloudmode or self.pw.fleetapi) and self.pw.client is not None: + self.proxystats[PROXY_STATS_TYPE.SITEID] = self.pw.client.siteid + self.proxystats[PROXY_STATS_TYPE.COUNTER] = self.pw.client.counter + self.proxystats[PROXY_STATS_TYPE.AUTH_MODE] = self.pw.authmode + message = f""" + + + + + + +Stat | Value | ||
---|---|---|---|
{key} | {value} | ||
URI: {uri} | {count} | ||
Config: | +
+
+
+ Click to view+
|
+
Page refresh: {str(datetime.datetime.fromtimestamp(time.time()))}
\n\n' + return self.wfile.write(message.encode(UTF_8)) + + + def handle_problems(self) -> str: + return self.send_json_response({"problems": []}) + + def handle_allowlist(self, path) -> str: + response = self.pw.poll(path, jsonformat=True) + return self.send_json_response(json.loads(response)) + + def handle_tedapi(self, path) -> str: + if not self.pw.tedapi: + return self.send_json_response({"error": "TEDAPI not enabled"}, status_code=HTTPStatus.BAD_REQUEST) + + commands = { + '/tedapi/config': self.pw.tedapi.get_config, + '/tedapi/status': self.pw.tedapi.get_status, + '/tedapi/components': self.pw.tedapi.get_components, + '/tedapi/battery': self.pw.tedapi.get_battery_blocks, + '/tedapi/controller': self.pw.tedapi.get_device_controller, + } + command = commands.get(path) + if command: + return self.send_json_response(command()) + + return self.send_json_response( + {"error": "Use /tedapi/config, /tedapi/status, /tedapi/components, /tedapi/battery, /tedapi/controller"}, + status_code=HTTPStatus.BAD_REQUEST + ) + + + def handle_cloud(self, path) -> str: + if not self.pw.cloudmode or self.pw.fleetapi: + return self.send_json_response({"error": "Cloud API not enabled"}, status_code=HTTPStatus.BAD_REQUEST) + + commands = { + '/cloud/battery': self.pw.client.get_battery, + '/cloud/power': self.pw.client.get_site_power, + '/cloud/config': self.pw.client.get_site_config, + } + command = commands.get(path) + if command: + return self.send_json_response(command()) + return self.send_json_response({"error": "Use /cloud/battery, /cloud/power, /cloud/config"}, status_code=HTTPStatus.BAD_REQUEST) + + + def handle_fleetapi(self, path) -> str: + if not self.pw.fleetapi: + return self.send_json_response({"error": "FleetAPI not enabled"}, status_code=HTTPStatus.BAD_REQUEST) + + commands = { + '/fleetapi/info': self.pw.client.get_site_info, + '/fleetapi/status': self.pw.client.get_live_status, + } + command = commands.get(path) + if command: + return self.send_json_response(command()) + + return self.send_json_response({"error": "Use /fleetapi/info, /fleetapi/status"}, status_code=HTTPStatus.BAD_REQUEST) + + + def handle_static_content(self, path) -> str: + self.proxystats[PROXY_STATS_TYPE.GETS] += 1 + self.send_response(HTTPStatus.OK) + if self.pw.authmode == "token": + self.send_header("Set-Cookie", f"AuthCookie=1234567890;{self.configuration[CONFIG_TYPE.PW_COOKIE_SUFFIX]}") + self.send_header("Set-Cookie", f"UserRecord=1234567890;{self.configuration[CONFIG_TYPE.PW_COOKIE_SUFFIX]}") + else: + auth = self.pw.client.auth + self.send_header("Set-Cookie", f"AuthCookie={auth['AuthCookie']};{self.configuration[CONFIG_TYPE.PW_COOKIE_SUFFIX]}") + self.send_header("Set-Cookie", f"UserRecord={auth['UserRecord']};{self.configuration[CONFIG_TYPE.PW_COOKIE_SUFFIX]}") + + if path == "/" or path == "": + path = "/index.html" + content, content_type = get_static(WEB_ROOT, path) + status = self.pw.status() + content = content.decode(UTF_8) + # fix the following variables that if they are None, return "" + content = content.replace("{VERSION}", status.get("version", "") or "") + content = content.replace("{HASH}", status.get("git_hash", "") or "") + content = content.replace("{EMAIL}", self.configuration.get(CONFIG_TYPE.PW_EMAIL, "") or "") + content = content.replace("{STYLE}", self.configuration.get(CONFIG_TYPE.PW_STYLE, "") or "") + # convert fcontent back to bytes + content = bytes(content, UTF_8) + else: + content, content_type = get_static(WEB_ROOT, path) + + if content: + log.debug("Served from local web root: {} type {}".format(path, content_type)) + # If not found, serve from Powerwall web server + elif self.pw.cloudmode or self.pw.fleetapi: + log.debug(f"Cloud Mode - File not found: {path}") + content = bytes("Not Found", UTF_8) + content_type = "text/plain" + else: + # Proxy request to Powerwall web server. + pw_url = f"https://{self.pw.host}/{path.lstrip('/')}" + log.debug(f"Proxy request to: {pw_url}") try: - self.wfile.write(fcontent) + session = self.pw.client.session + response = session.get( + url=pw_url, + headers=self.pw.auth if self.pw.authmode == "token" else None, + cookies=None if self.pw.authmode == "token" else self.pw.auth, + verify=False, + stream=True, + timeout=self.pw.timeout, + ) + content = response.content + content_type = response.headers.get('content-type', 'text/html') except Exception as exc: - if "Broken pipe" in str(exc): - log.debug(f"Client disconnected before payload sent [doGET]: {exc}") - return - log.error(f"Error occured while sending PROXY response to client [doGET]: {exc}") - return - - # Count - if message is None: - proxystats['timeout'] = proxystats['timeout'] + 1 - message = "TIMEOUT!" - elif message == "ERROR!": - proxystats['errors'] = proxystats['errors'] + 1 - message = "ERROR!" + log.error("Error proxying request: %s", exc) + content = b"Error during proxy" + content_type = "text/plain" + + if self.configuration[CONFIG_TYPE.PW_BROWSER_CACHE] > 0 and content_type in ['text/css', 'application/javascript', 'image/png']: + self.send_header("Cache-Control", f"max-age={self.configuration[CONFIG_TYPE.PW_BROWSER_CACHE]}") else: - proxystats['gets'] = proxystats['gets'] + 1 - if self.path in proxystats['uri']: - proxystats['uri'][self.path] = proxystats['uri'][self.path] + 1 - else: - proxystats['uri'][self.path] = 1 + self.send_header("Cache-Control", "no-cache, no-store") + + if path.split('?')[0] == "/": + if os.path.exists(os.path.join(WEB_ROOT, self.configuration[CONFIG_TYPE.PW_STYLE])): + content = bytes(inject_js(content, self.configuration[CONFIG_TYPE.PW_STYLE]), UTF_8) - # Send headers and payload + self.send_header('Content-type', content_type) + self.end_headers() try: - self.send_header('Content-type', contenttype) - self.send_header('Content-Length', str(len(message))) - self.send_header("Access-Control-Allow-Origin", "*") - self.end_headers() - self.wfile.write(message.encode("utf8")) + self.wfile.write(content) except Exception as exc: - log.debug(f"Socket broken sending API response to client [doGET]: {exc}") + if "Broken pipe" in str(exc): + log.debug(f"Client disconnected before payload sent [doGET]: {exc}") + return content + log.error(f"Error occured while sending PROXY response to client [doGET]: {exc}") + return content -# noinspection PyTypeChecker -with ThreadingHTTPServer((bind_address, port), Handler) as server: - if https_mode == "yes": - # Activate HTTPS - log.debug("Activating HTTPS") - # pylint: disable=deprecated-method - server.socket = ssl.wrap_socket(server.socket, - certfile=os.path.join(os.path.dirname(__file__), 'localhost.pem'), - server_side=True, ssl_version=ssl.PROTOCOL_TLSv1_2, ca_certs=None, - do_handshake_on_connect=True) +def check_for_environmental_pw_configs() -> List[str]: + """ + Checks for environment variables with specific suffix patterns and returns the list of matching suffixes. - # noinspection PyBroadException - try: - server.serve_forever() - except (Exception, KeyboardInterrupt, SystemExit): - print(' CANCEL \n') + This function iterates over predefined suffixes (starting with an empty string and "1") to check if any + configuration-related environment variables are defined. If a match is found, the suffix is added to the + result list. For numeric suffixes, the function dynamically generates and checks the next numeric suffix. + + Returns: + List[str]: A list of suffixes for which environment variables were found. + + Notes: + - Environment variable names are constructed by appending the suffix to the `value` attribute of each + item in `CONFIG_TYPE`. + - The function dynamically appends numeric suffixes based on the highest found numeric suffix. + """ + suffixes_to_check = {"", "1"} + actual_configs = [] + + environment = [key.lower() for key in os.environ] + while suffixes_to_check: + current_suffix = suffixes_to_check.pop() + test_suffix = f"_{current_suffix}" if current_suffix.isnumeric() else current_suffix + if any(f"{config.value}{test_suffix}" in environment for config in CONFIG_TYPE): + actual_configs.append(test_suffix) + elif current_suffix.isnumeric() and int(current_suffix) > 1: + break + if current_suffix.isnumeric(): + next_suffix = str(int(current_suffix) + 1) + if next_suffix not in suffixes_to_check: + suffixes_to_check.add(next_suffix) + + return actual_configs + + +def read_env_configs() -> List[PROXY_CONFIG]: + suffixes = check_for_environmental_pw_configs() + configs: List[PROXY_CONFIG] = [] + for s in suffixes: + def get_env_value(config_type: CONFIG_TYPE, default: str | None) -> str | None: + """Helper function to construct environment variable names and retrieve their values.""" + env_var = f"{config_type.value}{s}".upper() + return os.getenv(env_var, default) + + config: PROXY_CONFIG = { + CONFIG_TYPE.PW_AUTH_MODE: get_env_value(CONFIG_TYPE.PW_AUTH_MODE, "cookie"), + CONFIG_TYPE.PW_AUTH_PATH: get_env_value(CONFIG_TYPE.PW_AUTH_PATH, ""), + CONFIG_TYPE.PW_BIND_ADDRESS: get_env_value(CONFIG_TYPE.PW_BIND_ADDRESS, ""), + CONFIG_TYPE.PW_BROWSER_CACHE: int(get_env_value(CONFIG_TYPE.PW_BROWSER_CACHE, "0")), + CONFIG_TYPE.PW_CACHE_EXPIRE: int(get_env_value(CONFIG_TYPE.PW_CACHE_EXPIRE, "5")), + CONFIG_TYPE.PW_CACHE_FILE: get_env_value(CONFIG_TYPE.PW_CACHE_FILE, ""), + CONFIG_TYPE.PW_CONTROL_SECRET: get_env_value(CONFIG_TYPE.PW_CONTROL_SECRET, ""), + CONFIG_TYPE.PW_EMAIL: get_env_value(CONFIG_TYPE.PW_EMAIL, "email@example.com"), + CONFIG_TYPE.PW_GW_PWD: get_env_value(CONFIG_TYPE.PW_GW_PWD, None), + CONFIG_TYPE.PW_HOST: get_env_value(CONFIG_TYPE.PW_HOST, ""), + CONFIG_TYPE.PW_HTTPS: (get_env_value(CONFIG_TYPE.PW_HTTPS, "no") or "no").lower(), + CONFIG_TYPE.PW_NEG_SOLAR: bool((get_env_value(CONFIG_TYPE.PW_NEG_SOLAR, "yes") or "yes").lower() == "yes"), + CONFIG_TYPE.PW_PASSWORD: get_env_value(CONFIG_TYPE.PW_PASSWORD, ""), + CONFIG_TYPE.PW_POOL_MAXSIZE: int(get_env_value(CONFIG_TYPE.PW_POOL_MAXSIZE, "15")), + CONFIG_TYPE.PW_PORT: int(get_env_value(CONFIG_TYPE.PW_PORT, "8675")), + CONFIG_TYPE.PW_SITEID: get_env_value(CONFIG_TYPE.PW_SITEID, None), + CONFIG_TYPE.PW_STYLE: str(get_env_value(CONFIG_TYPE.PW_STYLE, "clear")) + ".js", + CONFIG_TYPE.PW_TIMEOUT: int(get_env_value(CONFIG_TYPE.PW_TIMEOUT, "5")), + CONFIG_TYPE.PW_TIMEZONE: get_env_value(CONFIG_TYPE.PW_TIMEZONE, "America/Los_Angeles") + } + configs.append(config) + return configs + + +def build_configuration() -> List[PROXY_CONFIG]: + COOKIE_SUFFIX: Final[str] = "path=/;SameSite=None;Secure;" + configs = read_env_configs() + if len(configs) == 0: + log.error("No Tesla Energy Gateway configurations found. This should never happen. Proxy cannot start.") + exit(0) + + for config in configs: + # HTTP/S configuration + if config[CONFIG_TYPE.PW_HTTPS] == "yes": + config[CONFIG_TYPE.PW_COOKIE_SUFFIX] = COOKIE_SUFFIX + config[CONFIG_TYPE.PW_HTTP_TYPE] = "HTTPS" + elif config[CONFIG_TYPE.PW_HTTPS] == "http": + config[CONFIG_TYPE.PW_COOKIE_SUFFIX] = COOKIE_SUFFIX + config[CONFIG_TYPE.PW_HTTP_TYPE] = "HTTP" + else: + config[CONFIG_TYPE.PW_COOKIE_SUFFIX] = "path=/;" + config[CONFIG_TYPE.PW_HTTP_TYPE] = "HTTP" + + if config[CONFIG_TYPE.PW_CACHE_FILE] == "": + config[CONFIG_TYPE.PW_CACHE_FILE] = os.path.join(config[CONFIG_TYPE.PW_AUTH_PATH], ".powerwall") if config[CONFIG_TYPE.PW_AUTH_PATH] else ".powerwall" + + # Check for cache expire time limit below 5s + if int(config[CONFIG_TYPE.PW_CACHE_EXPIRE]) < 5: + log.warning(f"Cache expiration set below 5s for host:port={config[CONFIG_TYPE.PW_HOST]}:{config[CONFIG_TYPE.PW_PORT]} (PW_CACHE_EXPIRE={config[CONFIG_TYPE.PW_CACHE_EXPIRE]})") + + return configs + + +def run_server(host, port, enable_https, configuration: PROXY_CONFIG, pw: pypowerwall.Powerwall, pw_control: pypowerwall.Powerwall): + proxystats: PROXY_STATS = { + PROXY_STATS_TYPE.CF: configuration[CONFIG_TYPE.PW_CACHE_FILE], + PROXY_STATS_TYPE.CLEAR: int(time.time()), + PROXY_STATS_TYPE.CLOUDMODE: False, + PROXY_STATS_TYPE.CONFIG: configuration.copy(), + PROXY_STATS_TYPE.COUNTER: 0, + PROXY_STATS_TYPE.ERRORS: 0, + PROXY_STATS_TYPE.FLEETAPI: False, + PROXY_STATS_TYPE.GETS: 0, + PROXY_STATS_TYPE.MEM: 0, + PROXY_STATS_TYPE.MODE: "Unknown", + PROXY_STATS_TYPE.POSTS: 0, + PROXY_STATS_TYPE.PW3: False, + PROXY_STATS_TYPE.PYPOWERWALL: f"{pypowerwall.version} Proxy {BUILD}", + PROXY_STATS_TYPE.SITE_NAME: "", + PROXY_STATS_TYPE.SITEID: None, + PROXY_STATS_TYPE.START: int(time.time()), + PROXY_STATS_TYPE.TEDAPI_MODE: "off", + PROXY_STATS_TYPE.TEDAPI: False, + PROXY_STATS_TYPE.TIMEOUT: 0, + PROXY_STATS_TYPE.TS: int(time.time()), + PROXY_STATS_TYPE.UPTIME: "", + PROXY_STATS_TYPE.URI: {} + } + + def handler_factory(*args, **kwargs): + return Handler(*args, configuration=configuration, pw=pw, pw_control=pw_control, proxy_stats=proxystats, **kwargs) + + with ThreadingHTTPServer((host, port), handler_factory) as server: + if enable_https: + log.debug(f"Activating HTTPS on {host}:{port}") + server.socket = ssl.wrap_socket( + server.socket, + certfile=os.path.join(os.path.dirname(__file__), 'localhost.pem'), + server_side=True, + ssl_version=ssl.PROTOCOL_TLSv1_2, + ca_certs=None, + do_handshake_on_connect=True + ) + try: + log.info(f"Starting server on {host}:{port}") + site_name = pw.site_name() or "Unknown" + if pw.cloudmode or pw.fleetapi: + if pw.fleetapi: + proxystats[PROXY_STATS_TYPE.MODE] = "FleetAPI" + log.info("pyPowerwall Proxy Server - FleetAPI Mode") + else: + proxystats[PROXY_STATS_TYPE.MODE] = "Cloud" + log.info("pyPowerwall Proxy Server - Cloud Mode") + log.info(f"Connected to Site ID {pw.client.siteid} ({site_name.strip()})") + if configuration[CONFIG_TYPE.PW_SITEID] is not None and configuration[CONFIG_TYPE.PW_SITEID] != str(pw.client.siteid): + log.info(f"Switch to Site ID {configuration[CONFIG_TYPE.PW_SITEID]}") + if not pw.client.change_site(configuration[CONFIG_TYPE.PW_SITEID]): + log.error("Fatal Error: Unable to connect. Please fix config and restart.") + while True: + try: + time.sleep(5) # Infinite loop to keep container running + except (KeyboardInterrupt, SystemExit): + sys.exit(0) + else: + proxystats[PROXY_STATS_TYPE.MODE] = "Local" + log.info("pyPowerwall Proxy Server - Local Mode") + log.info(f"Connected to Energy Gateway {configuration[CONFIG_TYPE.PW_HOST]} ({site_name.strip()})") + if pw.tedapi: + proxystats[PROXY_STATS_TYPE.TEDAPI] = True + proxystats[PROXY_STATS_TYPE.TEDAPI_MODE] = pw.tedapi_mode + proxystats[PROXY_STATS_TYPE.PW3] = pw.tedapi.pw3 + log.info(f"TEDAPI Mode Enabled for Device Vitals ({pw.tedapi_mode})") + + server.serve_forever() + except (Exception, KeyboardInterrupt, SystemExit): + log.info(f"Server on {host}:{port} stopped") + sys.exit(0) + + +def main() -> None: + servers: List[threading.Thread] = [] + configs = build_configuration() + + for config in configs: + try: + pw = pypowerwall.Powerwall( + host=config[CONFIG_TYPE.PW_HOST], + password=config[CONFIG_TYPE.PW_PASSWORD], + email=config[CONFIG_TYPE.PW_EMAIL], + timezone=config[CONFIG_TYPE.PW_TIMEZONE], + pwcacheexpire=config[CONFIG_TYPE.PW_CACHE_EXPIRE], + timeout=config[CONFIG_TYPE.PW_TIMEOUT], + poolmaxsize=config[CONFIG_TYPE.PW_POOL_MAXSIZE], + siteid=config[CONFIG_TYPE.PW_SITEID], + authpath=config[CONFIG_TYPE.PW_AUTH_PATH], + authmode=config[CONFIG_TYPE.PW_AUTH_MODE], + cachefile=config[CONFIG_TYPE.PW_CACHE_FILE], + auto_select=True, + retry_modes=True, + gw_pwd=config[CONFIG_TYPE.PW_GW_PWD] + ) + except Exception as e: + log.error(e) + log.error("Fatal Error: Unable to connect. Please fix config and restart.") + while True: + try: + time.sleep(5) # Infinite loop to keep container running + except (KeyboardInterrupt, SystemExit): + sys.exit(0) + + pw_control = configure_pw_control(pw, config) + + server = threading.Thread( + target=run_server, + args=( + config[CONFIG_TYPE.PW_BIND_ADDRESS], # Host + config[CONFIG_TYPE.PW_PORT], # Port + config[CONFIG_TYPE.PW_HTTPS] == "yes", # HTTPS + config, + pw, + pw_control + ) + ) + servers.append(server) + + # Start all server threads + for config, server in zip(configs, servers): + log.info( + f"pyPowerwall [{pypowerwall.version}] Proxy Server [{BUILD}] - {config[CONFIG_TYPE.PW_HTTP_TYPE]} Port {config[CONFIG_TYPE.PW_PORT]}{' - DEBUG' if SERVER_DEBUG else ''}" + ) + log.info("pyPowerwall Proxy Started\n") + server.start() + + # Wait for all server threads to finish + for server in servers: + server.join() log.info("pyPowerwall Proxy Stopped") sys.exit(0) + + +if __name__ == '__main__': + main() diff --git a/proxy/web/example.html b/proxy/web/example.html index cafcb3a..d6ae868 100644 --- a/proxy/web/example.html +++ b/proxy/web/example.html @@ -16,7 +16,7 @@