Skip to content

Commit

Permalink
Add Bearer token authmode option #57
Browse files Browse the repository at this point in the history
  • Loading branch information
jasonacox committed Jan 3, 2024
1 parent a81b5e5 commit 2c00ba4
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 22 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ and call function to poll data. Here is an example:
Classes
Powerwall(host, password, email, timezone, pwcacheexpire, timeout, poolmaxsize,
cloudmode, siteid, authpath)
cloudmode, siteid, authpath, authmode)
Parameters
host # Hostname or IP of the Tesla gateway
Expand All @@ -136,6 +136,7 @@ and call function to poll data. Here is an example:
cloudmode = False # If True, use Tesla cloud for data (default is False)
siteid # If cloudmode is True, use this siteid (default is None)
authpath # Path to cloud auth and site cache files (default is "")
authmode = "cookie" # "cookie" (default) or "token" - use cookie or bearer token for auth
Functions
poll(api, json, force) # Return data from Powerwall api (dict if json=True, bypass cache force=True)
Expand Down
11 changes: 11 additions & 0 deletions RELEASE.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
# RELEASE NOTES

## v0.7.4 - Bearer Token Auth

* This release adds the ability to use a Bearer Token for Authentication for the local Powerwall gateway API calls. This is selectable by defining `authmode='token'` in the initialization. The default mode uses the existing `AuthCookie` and `UserRecord` method.

```python
import pypowerwall

pw = pypowerwall.Powerwall(HOST, PASSWORD, EMAIL, TIMEZONE, authmode="token")
```


## v0.7.3 - Cloud Mode Setup

* Setup will now check for `PW_AUTH_PATH` environmental variable to set the path for `.pypowerwall.auth` and `.pypowerwall.site` by @mcbirse in https://github.com/jasonacox/pypowerwall/pull/62
Expand Down
40 changes: 29 additions & 11 deletions proxy/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import ssl
from transform import get_static, inject_js

BUILD = "t37"
BUILD = "t38"
ALLOWLIST = [
'/api/status', '/api/site_info/site_name', '/api/meters/site',
'/api/meters/solar', '/api/sitemaster', '/api/powerwalls',
Expand Down Expand Up @@ -72,6 +72,7 @@
style = os.getenv("PW_STYLE", "clear") + ".js"
siteid = os.getenv("PW_SITEID", None)
authpath = os.getenv("PW_AUTH_PATH", "")
authmode = os.getenv("PW_AUTH_MODE", "cookie")

# Global Stats
proxystats = {}
Expand Down Expand Up @@ -135,7 +136,8 @@ def get_value(a, key):
# TODO: Add support for multiple Powerwalls
try:
pw = pypowerwall.Powerwall(host,password,email,timezone,cache_expire,
timeout,pool_maxsize,siteid=siteid,authpath=authpath)
timeout,pool_maxsize,siteid=siteid,
authpath=authpath,authmode=authmode)
except Exception as e:
log.error(e)
log.error("Fatal Error: Unable to connect. Please fix config and restart.")
Expand Down Expand Up @@ -222,6 +224,7 @@ def do_GET(self):
if pw.cloudmode and pw.Tesla is not None:
proxystats['siteid'] = pw.Tesla.siteid
proxystats['counter'] = pw.Tesla.counter
proxystats['authmode'] = pw.authmode
message = json.dumps(proxystats)
elif self.path == '/stats/clear':
# Clear Internal Stats
Expand Down Expand Up @@ -334,6 +337,7 @@ def do_GET(self):
if pw.cloudmode and pw.Tesla is not None:
proxystats['siteid'] = pw.Tesla.siteid
proxystats['counter'] = pw.Tesla.counter
proxystats['authmode'] = pw.authmode
contenttype = 'text/html'
message = '<html>\n<head><meta http-equiv="refresh" content="5" />\n'
message += '<style>p, td, th { font-family: Helvetica, Arial, sans-serif; font-size: 10px;}</style>\n'
Expand All @@ -355,8 +359,13 @@ def do_GET(self):
else:
# Everything else - Set auth headers required for web application
proxystats['gets'] = proxystats['gets'] + 1
self.send_header("Set-Cookie", "AuthCookie={};{}".format(pw.auth['AuthCookie'], cookiesuffix))
self.send_header("Set-Cookie", "UserRecord={};{}".format(pw.auth['UserRecord'], cookiesuffix))
if pw.authmode == "token":
# Create bogus cookies
self.send_header("Set-Cookie", "AuthCookie=1234567890;{}".format(cookiesuffix))
self.send_header("Set-Cookie", "UserRecord=1234567890;{}".format(cookiesuffix))
else:
self.send_header("Set-Cookie", "AuthCookie={};{}".format(pw.auth['AuthCookie'], cookiesuffix))
self.send_header("Set-Cookie", "UserRecord={};{}".format(pw.auth['UserRecord'], cookiesuffix))

# Serve static assets from web root first, if found.
if self.path == "/" or self.path == "":
Expand Down Expand Up @@ -388,13 +397,22 @@ def do_GET(self):
proxy_path = proxy_path[1:]
pw_url = "https://{}/{}".format(pw.host, proxy_path)
log.debug("Proxy request to: {}".format(pw_url))
r = pw.session.get(
url=pw_url,
cookies=pw.auth,
verify=False,
stream=True,
timeout=pw.timeout
)
if pw.authmode == "token":
r = pw.session.get(
url=pw_url,
headers=pw.auth,
verify=False,
stream=True,
timeout=pw.timeout
)
else:
r = pw.session.get(
url=pw_url,
cookies=pw.auth,
verify=False,
stream=True,
timeout=pw.timeout
)
fcontent = r.content
ftype = r.headers['content-type']

Expand Down
46 changes: 36 additions & 10 deletions pypowerwall/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@
* Will cache responses for 5s to limit number of calls to Powerwall Gateway
* Will re-use http connections to Powerwall Gateway for reduced load and faster response times
* Can use Tesla Cloud API instead of local Powerwall Gateway (if enabled)
* Uses Auth Cookie or Bearer Token for authorization (configurable)
Classes
Powerwall(host, password, email, timezone, pwcacheexpire, timeout, poolmaxsize,
cloudmode, siteid, authpath)
cloudmode, siteid, authpath, authmode)
Parameters
host # Hostname or IP of the Tesla gateway
Expand All @@ -30,6 +31,7 @@
cloudmode = False # If True, use Tesla cloud for data (default is False)
siteid = None # If cloudmode is True, use this siteid (default is None)
authpath = "" # Path to cloud auth and site files (default current directory)
authmode = "cookie" # "cookie" (default) or "token" - use cookie or bearer token for auth
Functions
poll(api, json, force) # Return data from Powerwall api (dict if json=True, bypass cache force=True)
Expand Down Expand Up @@ -72,7 +74,7 @@
from . import tesla_pb2 # Protobuf definition for vitals
from . import cloud # Tesla Cloud API

version_tuple = (0, 7, 3)
version_tuple = (0, 7, 4)
version = __version__ = '%d.%d.%d' % version_tuple
__author__ = 'jasonacox'

Expand Down Expand Up @@ -101,7 +103,7 @@ class ConnectionError(Exception):
class Powerwall(object):
def __init__(self, host="", password="", email="[email protected]",
timezone="America/Los_Angeles", pwcacheexpire=5, timeout=5, poolmaxsize=10,
cloudmode=False, siteid=None, authpath=""):
cloudmode=False, siteid=None, authpath="", authmode="cookie"):
"""
Represents a Tesla Energy Gateway Powerwall device.
Expand All @@ -117,6 +119,7 @@ def __init__(self, host="", password="", email="[email protected]",
cloudmode = If True, use Tesla cloud for data (default is False)
siteid = If cloudmode is True, use this siteid (default is None)
authpath = Path to cloud auth and site cache files (default current directory)
authmode = "cookie" (default) or "token" - use cookie or bearer token for authorization
"""

Expand All @@ -128,14 +131,16 @@ def __init__(self, host="", password="", email="[email protected]",
self.timezone = timezone
self.timeout = timeout # 5s timeout for http calls
self.poolmaxsize = poolmaxsize # pool max size for http connection re-use
self.auth = {} # caches authentication cookies
self.auth = {} # caches auth cookies
self.token = None # caches bearer token
self.pwcachetime = {} # holds the cached data timestamps for api
self.pwcache = {} # holds the cached data for api
self.pwcacheexpire = pwcacheexpire # seconds to expire cache
self.cloudmode = cloudmode # cloud mode or local mode (default)
self.siteid = siteid # siteid for cloud mode
self.authpath = authpath # path to auth and site cache files
self.Tesla = None # cloud object for cloud connection
self.authmode = authmode # cookie or token

# Check for cloud mode
if self.cloudmode or self.host == "":
Expand All @@ -162,7 +167,16 @@ def __init__(self, host="", password="", email="[email protected]",
try:
f = open(self.cachefile, "r")
self.auth = json.load(f)
log.debug('loaded auth from cache file %s' % self.cachefile)
# Check to see if we have a valid cached session for the mode
if self.authmode == "token":
if 'Authorization' in self.auth:
self.token = self.auth['Authorization'].split(' ')[1]
else:
self.auth = {}
else:
if 'AuthCookie' not in self.auth or 'UserRecord' not in self.auth:
self.auth = {}
log.debug('loaded auth from cache file %s (%s authmode)' % (self.cachefile, self.authmode))
except:
log.debug('no auth cache file')
pass
Expand All @@ -185,7 +199,11 @@ def _get_session(self):

# Save Auth cookies
try:
self.auth = {'AuthCookie': r.cookies['AuthCookie'], 'UserRecord': r.cookies['UserRecord']}
if self.authmode == "token":
self.token = r.json()['token']
self.auth = {'Authorization': 'Bearer ' + self.token}
else:
self.auth = {'AuthCookie': r.cookies['AuthCookie'], 'UserRecord': r.cookies['UserRecord']}
try:
f = open(self.cachefile, "w")
json.dump(self.auth,f)
Expand All @@ -202,7 +220,11 @@ def _close_session(self):
self.Tesla.logout()
return
url = "https://%s/api/logout" % self.host
g = self.session.get(url, cookies=self.auth, verify=False, timeout=self.timeout)
if self.authmode == "token":
g = self.session.get(url, headers=self.auth, verify=False, timeout=self.timeout)
else:
g = self.session.get(url, cookies=self.auth, verify=False, timeout=self.timeout)

self.auth = {}

def is_connected(self):
Expand Down Expand Up @@ -256,10 +278,11 @@ def poll(self, api='/api/site_info/site_name', jsonformat=False, raw=False, recu

url = "https://%s%s" % (self.host, api)
try:
if(raw):
r = self.session.get(url, cookies=self.auth, verify=False, timeout=self.timeout, stream=True)
if self.authmode == "token":
r = self.session.get(url, headers=self.auth, verify=False, timeout=self.timeout, stream=raw)
else:
r = self.session.get(url, cookies=self.auth, verify=False, timeout=self.timeout)
r = self.session.get(url, cookies=self.auth, verify=False, timeout=self.timeout, stream=raw)

except requests.exceptions.Timeout:
log.debug('ERROR Timeout waiting for Powerwall API %s' % url)
return None
Expand All @@ -272,6 +295,9 @@ def poll(self, api='/api/site_info/site_name', jsonformat=False, raw=False, recu
if r.status_code >= 400 and r.status_code < 500:
# Session Expired - Try to get a new one unless we already tried
if(not recursive):
if raw:
# Drain the stream
payload = r.raw.data
self._get_session()
return self.poll(api, jsonformat, raw, True)
else:
Expand Down

0 comments on commit 2c00ba4

Please sign in to comment.