Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use healthpi-client to communicate with database #32

Merged
merged 1 commit into from
Apr 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
829 changes: 702 additions & 127 deletions Cargo.lock

Large diffs are not rendered by default.

8 changes: 7 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
[workspace]
members = ["healthpi-api", "healthpi-bt", "healthpi-db", "healthpi-loader", "healthpi-model"]
members = [
"healthpi-api",
"healthpi-bt",
"healthpi-client",
"healthpi-loader",
"healthpi-model",
]
resolver = "2"
11 changes: 9 additions & 2 deletions healthpi-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,18 @@ version = "0.1.0"
edition = "2021"

[dependencies]
healthpi-db = { path = "../healthpi-db" }
healthpi-model = { path = "../healthpi-model" }

actix-web = "4.3.0"
actix-cors = "0.6.4"
async-trait = "0.1.63"
chrono = { version = "0.4.19" }
dotenv = "0.15.0"
itertools = "0.10.5"
log = "0.4.17"
log4rs = "1.2.0"
ron = "0.8.0"
rustc-hash = "1.1.0"
serde = { version = "1.0.152", features = ["derive"] }
actix-cors = "0.6.4"
sqlx = { version = "0.6", features = ["runtime-tokio-rustls", "sqlite"] }
tokio = "1.24.2"
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ impl<'r> FromRow<'r, SqliteRow> for RecordRow {
}
}

#[mockall::automock]
#[async_trait]
pub trait MeasurementRepository: Send + Sync {
async fn store_records(&self, records: Vec<Record>) -> Result<(), Box<dyn Error>>;
Expand Down
2 changes: 2 additions & 0 deletions healthpi-api/src/db/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub(crate) mod connection;
pub(crate) mod measurement;
26 changes: 21 additions & 5 deletions healthpi-api/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
mod db;

use std::str::FromStr;

use actix_cors::Cors;
use actix_web::{get, web, App, HttpServer, Responder};
use healthpi_db::{
use actix_web::{get, post, web, App, HttpResponse, HttpServer, Responder};
use healthpi_model::measurement::{Record, ValueType};
use log::info;
use serde::{de, Deserialize};

use crate::db::{
connection::Connection,
measurement::{MeasurementRepository, MeasurementRepositoryImpl},
};
use healthpi_model::measurement::ValueType;
use log::info;
use serde::{de, Deserialize};

fn comma_separated_value_types<'de, D>(deserializer: D) -> Result<Vec<ValueType>, D::Error>
where
Expand All @@ -22,6 +25,7 @@ where

#[derive(Debug, Deserialize)]
struct Query {
#[serde(default)]
#[serde(deserialize_with = "comma_separated_value_types")]
select: Vec<ValueType>,
}
Expand All @@ -39,6 +43,17 @@ async fn index(
)
}

#[post("/")]
async fn post_measurements(
measurement_repository: web::Data<MeasurementRepositoryImpl>,
measurements: web::Json<Vec<Record>>,
) -> impl Responder {
match measurement_repository.store_records(measurements.0).await {
Ok(_) => HttpResponse::Created().json(()),
Err(_) => HttpResponse::InternalServerError().json(()),
}
}

#[actix_web::main]
async fn main() -> std::io::Result<()> {
log4rs::init_file("log4rs.yml", Default::default()).unwrap();
Expand All @@ -53,6 +68,7 @@ async fn main() -> std::io::Result<()> {
.wrap(cors)
.app_data(web::Data::new(measurement_repository.clone()))
.service(index)
.service(post_measurements)
})
.bind(("127.0.0.1", 8080))?
.run()
Expand Down
13 changes: 13 additions & 0 deletions healthpi-client/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[package]
name = "healthpi-client"
version = "0.1.0"
edition = "2021"

[dependencies]
async-trait = "0.1.79"
healthpi-model = { path = "../healthpi-model", features = ["serde"] }

itertools = "0.12.1"
mockall = "0.12.1"
reqwest = { version = "0.12.3", features = ["json"] }
thiserror = "1.0.58"
78 changes: 78 additions & 0 deletions healthpi-client/src/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
use async_trait::async_trait;
use healthpi_model::measurement::{Record, ValueType};
use itertools::Itertools;

#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("failed to communicate with the server")]
ServerError,
}

type Result<T> = std::result::Result<T, Error>;

#[mockall::automock]
#[async_trait]
pub trait Client: Send + Sync {
async fn get_records(&self) -> Result<Vec<Record>>;
async fn get_records_with_value_types(&self, types: &[ValueType]) -> Result<Vec<Record>>;
async fn post_records(&self, records: &[Record]) -> Result<()>;
}

pub struct ClientImpl {
url: String,
client: reqwest::Client,
}

impl ClientImpl {
fn new(url: String) -> Self {
Self {
url,
client: reqwest::Client::new(),
}
}
}

pub fn create(url: String) -> impl Client {
ClientImpl::new(url)
}

#[async_trait]
impl Client for ClientImpl {
async fn get_records(&self) -> Result<Vec<Record>> {
self.client
.get(&self.url)
.send()
.await
.map_err(|_| Error::ServerError)?
.json()
.await
.map_err(|_| Error::ServerError)
}

async fn get_records_with_value_types(&self, types: &[ValueType]) -> Result<Vec<Record>> {
self.client
.get(&self.url)
.query(&[
"select",
&types.iter().map(|t| format!("{:?}", t)).join(","),
])
.send()
.await
.map_err(|_| Error::ServerError)?
.json()
.await
.map_err(|_| Error::ServerError)
}

async fn post_records(&self, records: &[Record]) -> Result<()> {
self.client
.post(&self.url)
.json(&records)
.send()
.await
.map_err(|_| Error::ServerError)?
.json()
.await
.map_err(|_| Error::ServerError)
}
}
3 changes: 3 additions & 0 deletions healthpi-client/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
mod client;

pub use client::{create, Client, Error, MockClient};
26 changes: 0 additions & 26 deletions healthpi-db/Cargo.toml

This file was deleted.

2 changes: 0 additions & 2 deletions healthpi-db/src/lib.rs

This file was deleted.

2 changes: 1 addition & 1 deletion healthpi-loader/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ edition = "2021"

[dependencies]
healthpi-bt = { path = "../healthpi-bt" }
healthpi-db = { path = "../healthpi-db" }
healthpi-client = { path = "../healthpi-client" }
healthpi-model = { path = "../healthpi-model" }

async-trait = "0.1.56"
Expand Down
12 changes: 4 additions & 8 deletions healthpi-loader/src/bin/load_json.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,13 @@
use std::{error::Error, fs::File, io::BufReader};

use healthpi_db::{
connection::Connection,
measurement::{MeasurementRepository, MeasurementRepositoryImpl},
};
use healthpi_client::Client;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let file = File::open("data.json")?;
let values = serde_json::from_reader(BufReader::new(file))?;
let db = Connection::establish().await?;
let repository = MeasurementRepositoryImpl::new(db);
repository.store_records(values).await?;
let values: Vec<_> = serde_json::from_reader(BufReader::new(file))?;
let client = healthpi_client::create("http://localhost:8080/".to_owned());
client.post_records(&values).await?;

Ok(())
}
8 changes: 2 additions & 6 deletions healthpi-loader/src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ use std::sync::{
Arc,
};

use healthpi_db::connection::Connection;
use healthpi_db::measurement::MeasurementRepositoryImpl;
use log::info;

use healthpi_loader::devices::device::FactoryImpl;
Expand All @@ -15,9 +13,7 @@ use healthpi_loader::Loader;
async fn main() -> Result<(), Box<dyn Error>> {
log4rs::init_file("log4rs.yml", Default::default())?;

info!("Connecting to database");
let conn = Connection::establish().await?;
let measurement_repository = Box::new(MeasurementRepositoryImpl::new(conn.clone()));
let api_client = Box::new(healthpi_client::create("http://localhost:8080/".to_owned()));

info!("Starting Bluetooth session");
let ble_session = healthpi_bt::create_session().await?;
Expand All @@ -27,7 +23,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
let loader = Arc::new(Loader::new(
ble_session,
factory,
measurement_repository,
api_client,
running.clone(),
));
ctrlc::set_handler(move || running.store(false, Ordering::Relaxed))?;
Expand Down
9 changes: 4 additions & 5 deletions healthpi-loader/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use std::{

use futures::lock::Mutex;
use healthpi_bt::BleSession;
use healthpi_db::measurement::MeasurementRepository;
use log::{debug, error, info};
use tokio::time;

Expand All @@ -20,21 +19,21 @@ use crate::devices::device::Factory;
pub struct Loader {
ble_session: Box<dyn BleSession>,
factory: Arc<Mutex<Box<dyn Factory>>>,
repository: Box<dyn MeasurementRepository>,
api_client: Box<dyn healthpi_client::Client>,
running: Arc<AtomicBool>,
}

impl Loader {
pub fn new(
ble_session: Box<dyn BleSession>,
factory: Box<dyn Factory>,
repository: Box<dyn MeasurementRepository>,
api_client: Box<dyn healthpi_client::Client>,
running: Arc<AtomicBool>,
) -> Self {
Self {
ble_session,
factory: Arc::new(Mutex::new(factory)),
repository,
api_client,
running,
}
}
Expand Down Expand Up @@ -82,7 +81,7 @@ impl Loader {
device.disconnect().await?;

info!("Storing records in database");
if let Err(e) = self.repository.store_records(records).await {
if let Err(e) = self.api_client.post_records(&records).await {
error!("Failed to store records in database, skipping. {}", e);
continue;
}
Expand Down
5 changes: 2 additions & 3 deletions healthpi-loader/tests/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use std::sync::{
use chrono::Utc;
use futures::stream;
use healthpi_bt::{BleCharacteristicEvent, MockBleCharacteristic, MockBleDevice, MockBleSession};
use healthpi_db::measurement::MockMeasurementRepository;
use healthpi_loader::{
devices::{device::MockFactory, soehnle::Shape200},
Loader,
Expand Down Expand Up @@ -67,9 +66,9 @@ async fn shape_200_returns_no_records() {
.returning(|ble_device| Some(Box::new(Shape200::new(ble_device))));
factory.expect_mark_processed().returning(|_| Utc::now());

let mut measurement_repository = MockMeasurementRepository::new();
let mut measurement_repository = healthpi_client::MockClient::new();
measurement_repository
.expect_store_records()
.expect_post_records()
.with(eq(vec![]))
.returning(|_| Ok(()));

Expand Down
4 changes: 4 additions & 0 deletions log4rs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,7 @@ loggers:
level: error
btleplug:
level: error
reqwest:
level: error
hyper_util:
level: error
Loading