diff --git a/Cargo.lock b/Cargo.lock index d5096e2..c8515d5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2732,6 +2732,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5e395fcf16a7a3d8127ec99782007af141946b4795001f876d54fb0d55978560" dependencies = [ "getrandom", + "serde", ] [[package]] diff --git a/healthpi-bt/Cargo.toml b/healthpi-bt/Cargo.toml index b643d38..52a646d 100644 --- a/healthpi-bt/Cargo.toml +++ b/healthpi-bt/Cargo.toml @@ -10,7 +10,7 @@ btleplug = { version = "0.11.5", optional = true } futures = "0.3.21" mockall = "0.11.3" serde = { version = "1.0.152", features = ["derive"] } -uuid = "1.1.2" +uuid = { version = "1.1.2", features = ["serde"] } [features] default = ["btleplug"] diff --git a/healthpi-bt/src/api.rs b/healthpi-bt/src/api.rs index 3f4a4f8..91734b7 100644 --- a/healthpi-bt/src/api.rs +++ b/healthpi-bt/src/api.rs @@ -4,8 +4,6 @@ use async_trait::async_trait; use futures::Stream; use uuid::Uuid; -use super::macaddress::MacAddress; - #[derive(Debug)] pub enum DeviceError { ConnectionFailure(String), @@ -35,6 +33,21 @@ pub trait BleCharacteristic: Send + Sync + fmt::Debug { async fn read(&self) -> Result, DeviceError>; } +#[derive(Clone, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)] +pub struct DeviceId(String); + +impl DeviceId { + pub fn new(device_id: String) -> Self { + Self(device_id) + } +} + +impl fmt::Display for DeviceId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.0) + } +} + #[mockall::automock] #[async_trait] pub trait BleDevice: Send + Sync { @@ -42,7 +55,7 @@ pub trait BleDevice: Send + Sync { async fn disconnect(&self) -> Result<(), DeviceError>; fn in_range(&self) -> bool; - fn mac_address(&self) -> MacAddress; + fn id(&self) -> DeviceId; fn name(&self) -> String; async fn get_characteristic( diff --git a/healthpi-bt/src/btleplug.rs b/healthpi-bt/src/btleplug.rs index 9a6b73e..02cfa1b 100644 --- a/healthpi-bt/src/btleplug.rs +++ b/healthpi-bt/src/btleplug.rs @@ -10,8 +10,9 @@ use futures::future; use futures::{lock::Mutex, Stream, StreamExt}; use uuid::Uuid; +use crate::api::DeviceId; + use super::api::{BleCharacteristic, BleCharacteristicEvent, BleDevice, BleSession, DeviceError}; -use super::macaddress::MacAddress; impl From for BleCharacteristicEvent { fn from(value: ValueNotification) -> Self { @@ -134,15 +135,19 @@ impl BleDevice for BleDeviceImpl { self.properties.rssi.is_some() } - fn mac_address(&self) -> MacAddress { - self.properties.address.into_inner().into() + fn id(&self) -> DeviceId { + if cfg!(target_os = "macos") { + DeviceId::new(self.peripheral.id().to_string()) + } else { + DeviceId::new(self.properties.address.to_string()) + } } fn name(&self) -> String { self.properties .local_name .clone() - .unwrap_or(self.mac_address().to_string()) + .unwrap_or(self.id().to_string()) } async fn get_characteristic( diff --git a/healthpi-bt/src/lib.rs b/healthpi-bt/src/lib.rs index 9d8ab72..97ed777 100644 --- a/healthpi-bt/src/lib.rs +++ b/healthpi-bt/src/lib.rs @@ -9,7 +9,7 @@ mod btleplug; mod macaddress; pub use api::{ - BleCharacteristic, BleCharacteristicEvent, BleDevice, BleSession, DeviceError, + BleCharacteristic, BleCharacteristicEvent, BleDevice, BleSession, DeviceError, DeviceId, MockBleCharacteristic, MockBleDevice, MockBleSession, }; #[cfg(feature = "bluez")] diff --git a/healthpi-db/Cargo.toml b/healthpi-db/Cargo.toml index 108ab11..97635cb 100644 --- a/healthpi-db/Cargo.toml +++ b/healthpi-db/Cargo.toml @@ -6,7 +6,7 @@ edition = "2021" [dependencies] healthpi-bt = { path = "../healthpi-bt" } chrono = { version = "0.4.19", features = ["serde"] } -sqlx = { version = "0.6", features = [ "runtime-tokio-rustls", "sqlite" ] } +sqlx = { version = "0.6", features = ["runtime-tokio-rustls", "sqlite"] } dotenv = "0.15.0" log = "0.4.17" num = "0.4" diff --git a/healthpi-db/src/measurement.rs b/healthpi-db/src/measurement.rs index 0792e08..898019c 100644 --- a/healthpi-db/src/measurement.rs +++ b/healthpi-db/src/measurement.rs @@ -1,5 +1,5 @@ use chrono::NaiveDateTime; -use healthpi_bt::MacAddress; +use healthpi_bt::DeviceId; use num::FromPrimitive; use num_derive::FromPrimitive; use serde::{Deserialize, Serialize}; @@ -92,7 +92,7 @@ impl TryFrom<(usize, f64)> for Value { #[derive(Clone, Debug, Hash, PartialEq, Deserialize, Serialize)] pub enum Source { - Device(MacAddress), + Device(DeviceId), Unknown(String), } diff --git a/healthpi-loader/src/devices/contour.rs b/healthpi-loader/src/devices/contour.rs index 77d390f..f3e3e74 100644 --- a/healthpi-loader/src/devices/contour.rs +++ b/healthpi-loader/src/devices/contour.rs @@ -40,7 +40,7 @@ impl ElitePlus { timestamp, vec![Value::Glucose(glucose as i32)], event.value, - Source::Device(self.ble_device.mac_address()), + Source::Device(self.ble_device.id()), ), )) } diff --git a/healthpi-loader/src/devices/device.rs b/healthpi-loader/src/devices/device.rs index 5805798..07b0b2b 100644 --- a/healthpi-loader/src/devices/device.rs +++ b/healthpi-loader/src/devices/device.rs @@ -1,11 +1,10 @@ use std::collections::HashMap; use std::io::{BufRead, BufReader}; -use std::str::FromStr; use std::{collections::HashSet, error::Error, fs::File}; use async_trait::async_trait; use chrono::{DateTime, Local, Utc}; -use healthpi_bt::{BleDevice, MacAddress}; +use healthpi_bt::{BleDevice, DeviceId}; use log::{debug, info, warn}; use healthpi_db::measurement::Record; @@ -21,19 +20,19 @@ pub trait Device { } struct BackoffTable { - expiry_timestamps: HashMap>, + expiry_timestamps: HashMap>, } impl BackoffTable { fn new() -> Self { Self { - expiry_timestamps: HashMap::>::new(), + expiry_timestamps: HashMap::>::new(), } } fn check(&self, device: &dyn BleDevice) -> bool { self.expiry_timestamps - .get(&device.mac_address()) + .get(&device.id()) .filter(|expiry| expiry > &&chrono::Utc::now()) .is_some() } @@ -42,8 +41,7 @@ impl BackoffTable { let backoff_expiry = chrono::Utc::now() .checked_add_signed(chrono::Duration::minutes(5)) .unwrap(); - self.expiry_timestamps - .insert(device.mac_address(), backoff_expiry); + self.expiry_timestamps.insert(device.id(), backoff_expiry); backoff_expiry } } @@ -55,13 +53,13 @@ pub trait Factory: Send + Sync { } pub struct FactoryImpl { - paired_devices: HashSet, + paired_devices: HashSet, backoff_table: BackoffTable, } impl FactoryImpl { #[allow(dead_code)] - pub fn new(paired_devices: HashSet) -> Self { + pub fn new(paired_devices: HashSet) -> Self { Self { paired_devices, backoff_table: BackoffTable::new(), @@ -70,11 +68,10 @@ impl FactoryImpl { pub fn from_file(path: &str) -> std::io::Result { let file = File::open(path)?; - let paired_devices: HashSet = BufReader::new(file) + let paired_devices: HashSet = BufReader::new(file) .lines() .map_while(|l| l.ok()) - .map(|s| MacAddress::from_str(&s)) - .filter_map(|l| l.ok()) + .map(DeviceId::new) .collect(); info!("Loaded {} paired devices from file", paired_devices.len()); @@ -85,7 +82,7 @@ impl FactoryImpl { impl Factory for FactoryImpl { fn make_device(&self, ble_device: Box) -> Option> { - if !ble_device.in_range() || !self.paired_devices.contains(&ble_device.mac_address()) { + if !ble_device.in_range() || !self.paired_devices.contains(&ble_device.id()) { None } else if self.backoff_table.check(&*ble_device) { debug!( @@ -101,8 +98,8 @@ impl Factory for FactoryImpl { Some(Box::new(soehnle::SystoMC400::new(ble_device))) } else { warn!( - "Device with MAC={} is not of any supported types", - ble_device.mac_address() + "Device with ID={} is not of any supported types", + ble_device.id() ); None } diff --git a/healthpi-loader/src/devices/soehnle.rs b/healthpi-loader/src/devices/soehnle.rs index 61a9e2c..511eb8e 100644 --- a/healthpi-loader/src/devices/soehnle.rs +++ b/healthpi-loader/src/devices/soehnle.rs @@ -3,10 +3,10 @@ use std::time::Duration; use async_trait::async_trait; use chrono::{NaiveDateTime, Utc}; use futures::StreamExt; -use healthpi_bt::{BleCharacteristicEvent, BleDevice, MacAddress}; +use healthpi_bt::{BleCharacteristicEvent, BleDevice, DeviceId}; use healthpi_db::measurement::{Record, Source, Value}; use healthpi_db::user::User; -use log::{debug, info}; +use log::{debug, info, trace}; use tokio::time::timeout; use uuid::Uuid; @@ -58,9 +58,18 @@ impl Shape200 { timestamp, values, event.value, - Source::Device(self.ble_device.mac_address()), + Source::Device(self.ble_device.id()), )) } + + fn user_from_event(event: BleCharacteristicEvent) -> User { + User::new( + event.value[3], + event.value[4] != 0, + u16::from_be_bytes([event.value[5], event.value[6]]), + event.value[9], + ) + } } #[async_trait] @@ -92,25 +101,33 @@ impl Device for Shape200 { info!("Subscribing to notifications"); let mut events = weight_characteristic.subscribe().await?; + // Request user information cmd_characteristic.write_with_response(&[0x0c, 1]).await?; info!("Reading user data"); let user = if let Some(event) = events.next().await { - User::new( - event.value[3], - event.value[4] != 0, - u16::from_be_bytes([event.value[5], event.value[6]]), - event.value[9], - ) + Self::user_from_event(event) } else { panic!("Did not receive user data!") }; + trace!("User: {:?}", user); + + // Consume remaining user data events before requesting measurement notifications. + // Otherwise measurement events might not come. + while let Ok(Some(event)) = timeout(Duration::from_secs(1), events.next()).await { + trace!( + "Additional user received: {:?}", + Self::user_from_event(event) + ); + } + // Request measurement data for user 1. cmd_characteristic.write_with_response(&[0x09, 1]).await?; info!("Processing measurement notifications"); let mut records = Vec::new(); - while let Ok(Some(event)) = timeout(Duration::from_millis(1000), events.next()).await { + while let Ok(Some(event)) = timeout(Duration::from_secs(1), events.next()).await { + trace!("Event received: {:?}", event.value); if let Some(record) = self.read_record(&user, event) { records.push(record); } @@ -198,7 +215,7 @@ impl SystoMC400 { Self { ble_device } } - fn read_record(raw_data: Vec, mac_address: MacAddress) -> Option { + fn read_record(raw_data: Vec, device_id: DeviceId) -> Option { let mut i = 1; let mut values = Vec::new(); @@ -228,13 +245,11 @@ impl SystoMC400 { values.push(Value::HeartRate(heart_rate as i32)); } - let raw_mac_address: [u8; 6] = mac_address.into(); - Some(Record::new( timestamp, values, raw_data, - Source::Device(raw_mac_address.into()), + Source::Device(device_id), )) } } @@ -267,8 +282,7 @@ impl Device for SystoMC400 { let mut prev_timestamp = NaiveDateTime::MIN; let mut timestamp_duplicate_count = 0; while let Ok(Some(event)) = timeout(Duration::from_millis(5000), events.next()).await { - if let Some(mut record) = Self::read_record(event.value, self.ble_device.mac_address()) - { + if let Some(mut record) = Self::read_record(event.value, self.ble_device.id()) { if record.timestamp == prev_timestamp { timestamp_duplicate_count += 1; record.timestamp += chrono::Duration::seconds(timestamp_duplicate_count); @@ -291,13 +305,13 @@ impl Device for SystoMC400 { #[cfg(test)] mod tests { - use chrono::{NaiveDate, NaiveTime}; - use super::*; + use chrono::{NaiveDate, NaiveTime}; + #[test] fn read_record_all_fields_present() { - let mac_address = MacAddress::from([0x12, 0x34, 0x56, 0x78, 0x9A, 0xBC]); + let device_id = DeviceId::new("12:34:56:78:9A:BC".into()); let raw_data = vec![ 30, 128, 0, 75, 0, 93, 0, 230, 7, 8, 4, 13, 49, 0, 80, 0, 0, 0, 0, ]; @@ -312,17 +326,17 @@ mod tests { Value::HeartRate(80), ], raw_data.clone(), - Source::Device(mac_address), + Source::Device(device_id.clone()), ); - let record = SystoMC400::read_record(raw_data.clone(), mac_address).unwrap(); + let record = SystoMC400::read_record(raw_data.clone(), device_id).unwrap(); assert_eq!(record, expected); } #[test] fn read_record_all_fields_present_kpa() { - let mac_address = MacAddress::from([0x12, 0x34, 0x56, 0x78, 0x9A, 0xBC]); + let device_id = DeviceId::new("12:34:56:78:9A:BC".into()); // 128 mmHg = 17100 Pa = 66 * 256 + 204 // 75 mmHg = 10000 Pa = 39 * 256 + 16 let raw_data = vec![ @@ -339,17 +353,17 @@ mod tests { Value::HeartRate(80), ], raw_data.clone(), - Source::Device(mac_address), + Source::Device(device_id.clone()), ); - let record = SystoMC400::read_record(raw_data.clone(), mac_address).unwrap(); + let record = SystoMC400::read_record(raw_data.clone(), device_id).unwrap(); assert_eq!(record, expected); } #[test] fn read_record_without_timestamp() { - let mac_address = MacAddress::from([0x12, 0x34, 0x56, 0x78, 0x9A, 0xBC]); + let device_id = DeviceId::new("12:34:56:78:9A:BC".into()); let raw_data = vec![28, 128, 0, 75, 0, 93, 0, 80, 0, 0, 0, 0]; let expected_values = vec![ Value::BloodPressureSystolic(128), @@ -357,16 +371,16 @@ mod tests { Value::HeartRate(80), ]; - let record = SystoMC400::read_record(raw_data.clone(), mac_address).unwrap(); + let record = SystoMC400::read_record(raw_data.clone(), device_id.clone()).unwrap(); assert_eq!(record.raw_data, raw_data); - assert_eq!(record.source, Source::Device(mac_address)); + assert_eq!(record.source, Source::Device(device_id)); assert_eq!(record.values, expected_values); } #[test] fn read_record_without_heart_rate() { - let mac_address = MacAddress::from([0x12, 0x34, 0x56, 0x78, 0x9A, 0xBC]); + let device_id = DeviceId::new("12:34:56:78:9A:BC".into()); let raw_data = vec![26, 128, 0, 75, 0, 93, 0, 230, 7, 8, 4, 13, 49, 0, 0, 0, 0]; let expected = Record::new( NaiveDateTime::new( @@ -378,10 +392,10 @@ mod tests { Value::BloodPressureDiastolic(75), ], raw_data.clone(), - Source::Device(mac_address), + Source::Device(device_id.clone()), ); - let record = SystoMC400::read_record(raw_data.clone(), mac_address).unwrap(); + let record = SystoMC400::read_record(raw_data.clone(), device_id).unwrap(); assert_eq!(record, expected); } diff --git a/log4rs.yml b/log4rs.yml index 584a5fa..b297f59 100644 --- a/log4rs.yml +++ b/log4rs.yml @@ -3,6 +3,9 @@ appenders: kind: console encoder: pattern: "{d(%Y-%m-%d %H:%M:%S.%3f)} {h({l})} {t}: {m}{n}" + filters: + - kind: threshold + level: debug application: kind: file @@ -14,6 +17,7 @@ appenders: pattern: "{d(%Y-%m-%d %H:%M:%S.%3f)} {h({l})} {t}: {m}{n}" root: + level: trace appenders: - stdout - application @@ -23,3 +27,5 @@ loggers: level: error sqlx::query: level: error + btleplug: + level: error