Skip to content

Commit

Permalink
✨ MQTT integration (#7)
Browse files Browse the repository at this point in the history
* ⬆️ Specify edition in Cargo toml

* Add MQTTReadingsSensor

* Integrate MQTTSender into main
  • Loading branch information
liamadamson authored Nov 26, 2023
1 parent e6e8ffd commit ea12ee1
Show file tree
Hide file tree
Showing 5 changed files with 167 additions and 15 deletions.
8 changes: 7 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
[package]
name = "room-monitor"
version = "0.1.0"
edition = "2021"

[dependencies]
anyhow = "1.0.75"
base64 = "0.21.5"
env_logger = "0.10.1"
log = "0.4.20"
mockall = "0.11.4"
paho-mqtt = "0.12.3"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0.108"
tokio = { version = "1.34.0", features = ["sync"] }

[target.'cfg(target_os = "linux")'.dependencies]
bme280 = "0.4.4"
linux-embedded-hal = "0.4.0-alpha.2"
linux-embedded-hal = "0.4.0-alpha.2"
4 changes: 2 additions & 2 deletions src/driver/linux.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
extern crate bme280;
extern crate linux_embedded_hal as hal;
use bme280;
use linux_embedded_hal as hal;

use super::{BME280Driver, Readings};

Expand Down
1 change: 1 addition & 0 deletions src/driver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub fn get_bme280_driver() -> anyhow::Result<impl BME280Driver> {
Ok(driver)
}

#[derive(serde::Serialize)]
pub struct Readings {
pub temperature: f32,
pub pressure: f32,
Expand Down
76 changes: 64 additions & 12 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,50 @@
pub mod driver;
pub mod mqtt;

use base64::Engine;
use driver::{get_bme280_driver, BME280Driver};
use mqtt::MQTTReadingsSender;
use std::env;
use std::thread::sleep;
use std::time::Duration;

const DEFAULT_MEASUREMENT_RATE_S: u64 = 5 * 60; // 5 minutes.

const TMP_AWS_ROOT_CA_FILE: &str = "/tmp/room_monitor/ca.crt";
const TMP_AWS_KEY_STORE_FILE: &str = "/tmp/room_monitor/client.pem.crt";
const TMP_AWS_PRIVATE_KEY_FILE: &str = "/tmp/room_monitor/private.pem.key";

fn main() {
setup_logging();

log::info!("Application started");

let measurement_rate_s = get_measurement_rate();

let server_uri = env::var("MQTT_SERVER_URI").expect("MQTT_SERVER_URI not set");
let client_id = env::var("MQTT_CLIENT_ID").expect("MQTT_CLIENT_ID not set");
let topic = env::var("MQTT_TOPIC").expect("MQTT_TOPIC not set");

set_credential("MQTT_CA_CERT", TMP_AWS_ROOT_CA_FILE);
set_credential("MQTT_CLIENT_CERT", TMP_AWS_KEY_STORE_FILE);
set_credential("MQTT_PRIVATE_KEY", TMP_AWS_PRIVATE_KEY_FILE);

let mqtt_sender = MQTTReadingsSender::new(
&server_uri,
&client_id,
&topic,
TMP_AWS_ROOT_CA_FILE,
TMP_AWS_KEY_STORE_FILE,
TMP_AWS_PRIVATE_KEY_FILE,
)
.expect("Failed to create MQTT sender");

let driver = get_bme280_driver().expect("Failed to get BME280 driver");
let mut runner = Runner::new(driver);
let mut runner = Runner::new(driver, mqtt_sender);

loop {
if let Err(e) = runner.step() {
log::error!("Error steppinf runner: {}", e);
log::error!("Error stepping runner: {}", e);
}

sleep(Duration::from_secs(measurement_rate_s));
Expand Down Expand Up @@ -59,29 +84,56 @@ fn setup_logging() {
.init();
}

struct Runner<T>
struct Runner<T, U>
where
T: BME280Driver,
U: ReadingsSender,
{
driver: T,
sender: U,
}

impl<T> Runner<T>
impl<T, U> Runner<T, U>
where
T: BME280Driver,
U: ReadingsSender,
{
fn new(driver: T) -> Self {
Self { driver }
fn new(driver: T, sender: U) -> Self {
Self { driver, sender }
}

fn step(&mut self) -> anyhow::Result<()> {
let readings = self.driver.read()?;
log::info!(
"{} degC,\t {} hPa,\t {} %",
readings.temperature,
readings.pressure,
readings.humidity
);

if let Err(e) = self.sender.send_readings(&readings) {
log::warn!("Failed to send readings: {}", e);
log::info!(
"Unsent reading: temperature: {}, pressure: {}, humidity: {}",
readings.temperature,
readings.pressure,
readings.humidity
);
}

Ok(())
}
}

pub trait ReadingsSender {
fn send_readings(&self, readings: &driver::Readings) -> anyhow::Result<()>;
}

fn set_credential(env_var: &str, file_name: &str) {
match env::var(env_var) {
Ok(val) => {
// Write the contents of base64-encoded env var to a file.
let engine = base64::engine::general_purpose::STANDARD;
let decoded = engine.decode(val).unwrap();
std::fs::write(file_name, decoded).unwrap();
}
Err(_) => {
log::error!("{} not set", env_var);
std::process::exit(1);
}
}
}
93 changes: 93 additions & 0 deletions src/mqtt.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
use super::ReadingsSender;
use paho_mqtt as mqtt;
use serde_json;

pub struct MQTTReadingsSender {
client: mqtt::Client,
connection_opts: mqtt::ConnectOptions,
topic: String,
}

impl MQTTReadingsSender {
pub fn new(
server_uri: &str,
client_id: &str,
topic: &str,
ca_file: &str,
key_store_file: &str,
private_key_file: &str,
) -> anyhow::Result<Self> {
match Self::create_mqtt_client(server_uri, client_id) {
Ok(client) => {
match Self::get_connection_opts(ca_file, key_store_file, private_key_file) {
Ok(connection_opts) => Ok(Self {
client,
connection_opts,
topic: topic.to_string(),
}),
Err(e) => Err(anyhow::anyhow!(
"Failed to create MQTT connection options: {}",
e
)),
}
}
Err(e) => Err(anyhow::anyhow!("Failed to create MQTT client: {}", e)),
}
}

fn create_mqtt_client(uri: &str, client_id: &str) -> Result<mqtt::Client, mqtt::Error> {
let opts = mqtt::CreateOptionsBuilder::new_v3()
.server_uri(uri)
.client_id(client_id)
.finalize();

mqtt::Client::new(opts)
}

pub fn get_connection_opts(
ca_file: &str,
key_store_file: &str,
private_key_file: &str,
) -> Result<mqtt::ConnectOptions, mqtt::Error> {
let ssl_opts = mqtt::SslOptionsBuilder::new()
.trust_store(ca_file)?
.key_store(key_store_file)?
.private_key(private_key_file)?
.finalize();

Ok(mqtt::ConnectOptionsBuilder::new()
.ssl_options(ssl_opts)
.clean_session(true)
.finalize())
}
}

impl ReadingsSender for MQTTReadingsSender {
fn send_readings(&self, readings: &crate::driver::Readings) -> anyhow::Result<()> {
let payload = serde_json::to_string(readings).unwrap();

let msg = mqtt::MessageBuilder::new()
.topic(&self.topic)
.payload(payload)
.qos(1)
.finalize();

match self.client.connect(self.connection_opts.clone()) {
Ok(_) => {
if let Err(e) = self.client.publish(msg) {
log::warn!("Failed to publish readings to MQTT broker: {}", e);
}

if let Err(e) = self.client.disconnect(None) {
log::warn!(
"Failed to disconnect from MQTT broker after sending readings: {}",
e
);
}

Ok(())
}
Err(e) => Err(anyhow::anyhow!("Failed to connect to MQTT broker: {}", e)),
}
}
}

0 comments on commit ea12ee1

Please sign in to comment.