Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FIXED version of implementing error checking #9

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 110 additions & 44 deletions bme680.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,38 +99,37 @@ class Adafruit_BME680:
def __init__(self, *, refresh_rate=10):
"""Check the BME680 was found, read the coefficients and enable the sensor for continuous
reads."""
self._write(_BME680_REG_SOFTRESET, [0xB6])
time.sleep(0.005)

# Check device ID.
chip_id = self._read_byte(_BME680_REG_CHIPID)
if chip_id != _BME680_CHIPID:
raise RuntimeError('Failed to find BME680! Chip ID 0x%x' % chip_id)

self._read_calibration()

# set up heater
self._write(_BME680_BME680_RES_HEAT_0, [0x73])
self._write(_BME680_BME680_GAS_WAIT_0, [0x65])

self.sea_level_pressure = 1013.25
"""Pressure in hectoPascals at sea level. Used to calibrate ``altitude``."""

# Default oversampling and filter register values.
self._pressure_oversample = 0b011
self._temp_oversample = 0b100
self._humidity_oversample = 0b010
self._filter = 0b010

self._adc_pres = None
self._adc_temp = None
self._adc_hum = None
self._adc_gas = None
self._gas_range = None
self._t_fine = None

self._last_reading = time.ticks_ms()
self._min_refresh_time = 1000 // refresh_rate
self._is_detected()
if self._detected == True:
self._read_calibration()

# set up heater
self._write(_BME680_BME680_RES_HEAT_0, [0x73])
self._write(_BME680_BME680_GAS_WAIT_0, [0x65])

self.sea_level_pressure = 1013.25
"""Pressure in hectoPascals at sea level. Used to calibrate ``altitude``."""

# Default oversampling and filter register values.
self._pressure_oversample = 0b011
self._temp_oversample = 0b100
self._humidity_oversample = 0b010
self._filter = 0b010

self._last_pres = 0
self._adc_pres = None
self._adc_temp = None
self._adc_hum = None
self._adc_gas = None
self._gas_range = None
self._t_fine = None

self._last_reading = time.ticks_ms()
self._min_refresh_time = 1000 // refresh_rate
else:
# BME680 not detected, so return None for outside validation
return(None)

@property
def pressure_oversample(self):
Expand Down Expand Up @@ -183,14 +182,22 @@ def filter_size(self, size):
@property
def temperature(self):
"""The compensated temperature in degrees celsius."""
self._perform_reading()
calc_temp = (((self._t_fine * 5) + 128) / 256)
return calc_temp / 100
if self._debug: print(f"\t Reading temperature... ", end="")
result = self._perform_reading()
if result is None:
if self._debug: print(f"...temp was None!")
return None
if self._debug: print(f"...fixing up temperature... ", end="")
calc_temp = (((self._t_fine * 5) + 128) / 256) / 100
if self._debug: print(f"...temperature is {calc_temp} ")
return calc_temp

@property
def pressure(self):
"""The barometric pressure in hectoPascals"""
self._perform_reading()
result = self._perform_reading()
if result is None:
return None
var1 = (self._t_fine / 2) - 64000
var2 = ((var1 / 4) * (var1 / 4)) / 2048
var2 = (var2 * self._pressure_calibration[5]) / 4
Expand All @@ -208,12 +215,24 @@ def pressure(self):
var2 = ((calc_pres / 4) * self._pressure_calibration[7]) / 8192
var3 = (((calc_pres / 256) ** 3) * self._pressure_calibration[9]) / 131072
calc_pres += ((var1 + var2 + var3 + (self._pressure_calibration[6] * 128)) / 16)
return calc_pres/100
if calc_pres > 1500:
calc_pres = calc_pres / 100
if calc_pres > 630:
# this looks valid, so cache it and return the value
self._last_pres = calc_pres
return calc_pres
if self._last_pres > 0:
# calc value is too low, so if we have a "good" prior read, return that
return self._last_pres
# otherwise just return the weird value, and hopefully we catch up
return calc_pres

@property
def humidity(self):
"""The relative humidity in RH %"""
self._perform_reading()
result = self._perform_reading()
if result is None:
return None
temp_scaled = ((self._t_fine * 5) + 128) / 256
var1 = ((self._adc_hum - (self._humidity_calibration[0] * 16)) -
((temp_scaled * self._humidity_calibration[2]) / 200))
Expand Down Expand Up @@ -245,16 +264,49 @@ def altitude(self):
@property
def gas(self):
"""The gas resistance in ohms"""
self._perform_reading()
result = self._perform_reading()
if result is None:
return None
var1 = ((1340 + (5 * self._sw_err)) * (_LOOKUP_TABLE_1[self._gas_range])) / 65536
var2 = ((self._adc_gas * 32768) - 16777216) + var1
var3 = (_LOOKUP_TABLE_2[self._gas_range] * var1) / 512
calc_gas_res = (var3 + (var2 / 2)) / var2
return int(calc_gas_res)

@property
def detected(self):
"""Whether the BME600 was detected"""
return self._detected

def _is_detected(self):
"""Check if the BME680 was found"""
if self._debug: print("Resetting BME680")
self._write(_BME680_REG_SOFTRESET, [0xB6])
time.sleep(0.005)

if self._debug: print("Attempting to read from BME680")
# Check device ID.
chip_id = self._read_byte(_BME680_REG_CHIPID)
# No chip found at this address
if chip_id is None:
if self._debug: print("chip_id is None")
self._detected = False
# Unsupported chip found at this address
elif chip_id != _BME680_CHIPID:
if self._debug: print(f"chip_id is {chip_id}")
self._detected = False
# Detected
else:
if self._debug: print("BME680 found!")
self._detected = True

def _perform_reading(self):
"""Perform a single-shot reading from the sensor and fill internal data structure for
calculations"""
if self._detected == False:
if self._debug: print("\t perform_reading failed!")
return None

expired = time.ticks_diff(self._last_reading, time.ticks_ms()) * time.ticks_diff(0, 1)
if 0 <= expired < self._min_refresh_time:
time.sleep_ms(self._min_refresh_time - expired)
Expand Down Expand Up @@ -291,6 +343,7 @@ def _perform_reading(self):
var3 = (var3 * self._temp_calibration[2] * 16) / 16384

self._t_fine = int(var2 + var3)
return(True)

def _read_calibration(self):
"""Read & save the calibration coefficients"""
Expand All @@ -316,12 +369,18 @@ def _read_calibration(self):

def _read_byte(self, register):
"""Read a byte register value and return it"""
return self._read(register, 1)[0]
result = self._read(register, 1)
if result is not None:
return result[0]
else:
return None

def _read(self, register, length):
# Overridden by I2C or SPI during init
raise NotImplementedError()

def _write(self, register, values):
# Overridden by I2C or SPI during init
raise NotImplementedError()

class BME680_I2C(Adafruit_BME680):
Expand All @@ -337,23 +396,30 @@ def __init__(self, i2c, address=0x77, debug=False, *, refresh_rate=10):
self._i2c = i2c
self._address = address
self._debug = debug
super().__init__(refresh_rate=refresh_rate)
return super().__init__(refresh_rate=refresh_rate)

def _read(self, register, length):
"""Returns an array of 'length' bytes from the 'register'"""
result = bytearray(length)
self._i2c.readfrom_mem_into(self._address, register & 0xff, result)
try:
self._i2c.readfrom_mem_into(self._address, register & 0xff, result)
except:
result = None
if self._debug:
print("\t${:x} read ".format(register), " ".join(["{:02x}".format(i) for i in result]))
return result
return(result)

def _write(self, register, values):
"""Writes an array of 'length' bytes to the 'register'"""
if self._debug:
print("\t${:x} write".format(register), " ".join(["{:02x}".format(i) for i in values]))
for value in values:
self._i2c.writeto_mem(self._address, register, bytearray([value & 0xFF]))
register += 1
try:
self._i2c.writeto_mem(self._address, register, bytearray([value & 0xFF]))
register += 1
return(0)
except:
return(None)


class BME680_SPI(Adafruit_BME680):
Expand Down
20 changes: 12 additions & 8 deletions bmetest.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
from bme680 import *
from machine import I2C, Pin
import time
bme = BME680_I2C(I2C(-1, Pin(13), Pin(12)))

for _ in range(3):
print(bme.temperature, bme.humidity, bme.pressure, bme.gas)
time.sleep(1)
try: from bme680 import *
except: pass
try: from machine import I2C, Pin
except: pass
from time import sleep

try:
bme = BME680_I2C(I2C(-1, Pin(13), Pin(12)))

for _ in range(3):
print(bme.temperature, bme.humidity, bme.pressure, bme.gas)
sleep(1)
except:
print("BME680 not detected")