Skip to content

Commit

Permalink
connect to db and try to write kvdata to flags schema
Browse files Browse the repository at this point in the history
  • Loading branch information
lcoram committed Jun 19, 2024
1 parent 4dd172e commit 9465f62
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 41 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions db/flags.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
CREATE TABLE IF NOT EXISTS flags.kvdata (
-- NOTE: this is the appropriate way to tie the QC data to a timeseries, or do we want to reference through a label somehow?
timeseries INT4 PRIMARY KEY REFERENCES public.timeseries,
obstime TIMESTAMPTZ NOT NULL,
original REAL NULL,
corrected REAL NULL,
controlinfo TEXT NULL,
useinfo TEXT NULL,
cfailed INT4 NULL
);
-- TODO: could be slow to index on both of these? Maybe just want a timeseries index?
CREATE INDEX IF NOT EXISTS kvdata_index ON flags.kvdata (timeseries, obstime);
5 changes: 4 additions & 1 deletion kafka_checked/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,7 @@ kafka = "0.10.0"
quick-xml = { version = "0.32.0", features = [ "serialize", "overlapped-lists" ] }
substring = "1.4.5"
chrono.workspace = true
serde.workspace = true
serde.workspace = true
tokio.workspace = true
tokio-postgres.workspace = true
thiserror.workspace = true
156 changes: 116 additions & 40 deletions kafka_checked/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,25 @@
// https://gitlab.met.no/tjenester/oda/-/blob/dev/base/oda-kvkafka/deployment.yaml?ref_type=heads

use chrono::NaiveDateTime;
use kafka::consumer::{Consumer, FetchOffset, GroupOffsetStorage};
use quick_xml::de::from_str;
use serde::Deserialize;
use std::env;
use substring::Substring;
use kafka::consumer::{Consumer, FetchOffset, GroupOffsetStorage};
use thiserror::Error;
use tokio_postgres::NoTls;

#[derive(Error, Debug)]
pub enum Error {
#[error("postgres returned an error: {0}")]
Database(#[from] tokio_postgres::Error),
#[error(
"no Timeseries found for this data - station {:?}, param {:?}",
station,
param
)]
NoTs { station: i32, param: i32 },
}

#[derive(Debug, Deserialize)]
/// Represents <KvalobsData>...</KvalobsData>
Expand All @@ -19,7 +34,7 @@ struct Stations {
val: i32,
typeid: Vec<Typeid>,
}
#[derive(Debug, Deserialize)]
#[derive(Debug, Deserialize)]
/// Represents <typeid>...</typeid>
struct Typeid {
#[serde(rename = "@val")]
Expand Down Expand Up @@ -64,66 +79,104 @@ struct Kvdata {
corrected: Option<f64>,
controlinfo: Option<String>,
useinfo: Option<String>,
cfailed:Option<String>,
cfailed: Option<String>,
}
/// Represents <kvtextdata>...</kvtextdata>
#[derive(Debug, Deserialize)]
struct Kvtextdata {
paramid: Option<i32>,
original: String,
struct Kvtextdata {
paramid: Option<i32>,
original: String,
}
#[derive(Debug, Deserialize)]
struct Tsid {
station: i32,
paramid: i32,
typeid: i32,
sensor: i32,
level: i32,
level: i32,
}

fn main() {

let mut consumer =
Consumer::from_hosts(vec!("kafka2-a1.met.no:9092".to_owned()))
.with_topic_partitions("kvalobs.production.checked".to_owned(), &[0, 1])
.with_fallback_offset(FetchOffset::Earliest)
.with_group("lard-test".to_owned())
.with_offset_storage(Some(GroupOffsetStorage::Kafka))
.create()
.unwrap();
#[tokio::main]
async fn main() -> Result<(), Error> {
let args: Vec<String> = env::args().collect();

// for debugging to limit the number of loops (if removed then consume the kafka queue infinitely)
let mut i = 1;
if args.len() < 4 {
panic!("not enough args passed in, at least host, user, dbname needed, optionally password")
}

let mut connect_string = format!("host={} user={} dbname={}", &args[1], &args[2], &args[3]);
if args.len() > 4 {
connect_string.push_str(" password=");
connect_string.push_str(&args[4])
}

// Connect to the database.
let (client, connection) = tokio_postgres::connect(connect_string.as_str(), NoTls).await?;

tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("connection error: {}", e);
}
});

// NOTE: reading from the 4 redundant kafka queues, but only reading the checked data (other topics exists)
let mut consumer = Consumer::from_hosts(vec![
"kafka2-a1.met.no:9092".to_owned(),
"kafka2-a2.met.no:9092".to_owned(),
"kafka2-b1.met.no:9092".to_owned(),
"kafka2-b2.met.no:9092".to_owned(),
])
.with_topic_partitions("kvalobs.production.checked".to_owned(), &[0, 1])
.with_fallback_offset(FetchOffset::Earliest)
.with_group("lard-test".to_owned())
.with_offset_storage(Some(GroupOffsetStorage::Kafka))
.create()
.unwrap();

// Consume the kafka queue infinitely
loop {

for ms in consumer.poll().unwrap().iter() {

for m in ms.messages() {
// do some basic trimming / processing of message
let xmlmsg = std::str::from_utf8(m.value).unwrap().trim().replace('\n', "").replace("\\", "");
let xmlmsg = std::str::from_utf8(m.value)
.unwrap()
.trim()
.replace('\n', "")
.replace("\\", "");

// do some checking / further processing of message
if !xmlmsg.starts_with("<?xml") {
println!("{:?}", "kv2kvdata must be xml starting with '<?xml '");
}
// dangerous unwrap? but expect to find it if have found the first half?
let kvalobs_xmlmsg = xmlmsg.substring(xmlmsg.find("?>").unwrap()+2, xmlmsg.len());
let kvalobs_xmlmsg = xmlmsg.substring(xmlmsg.find("?>").unwrap() + 2, xmlmsg.len());
//println!("kvalobsdata message: {:?} \n", kvalobs_xmlmsg);

let item: KvalobsData = from_str(kvalobs_xmlmsg).unwrap();
//println!("kvalobsdata item: {:?} \n", item);

// get the useful stuff out of this struct?
// get the useful stuff out of this struct
for station in item.station {
for typeid in station.typeid {
for obstime in typeid.obstime {
let obs_time = NaiveDateTime::parse_from_str(&obstime.val, "%Y-%m-%d %H:%M:%S").unwrap();
let obs_time =
NaiveDateTime::parse_from_str(&obstime.val, "%Y-%m-%d %H:%M:%S")
.unwrap();
println!("ObsTime: {:?} \n", obs_time);
for tbtime in obstime.tbtime {
if let Some(textdata) = tbtime.kvtextdata {
// TODO: how do we handle text data?
println!("textdata: {:?} \n", textdata);
let tb_time = NaiveDateTime::parse_from_str(
&tbtime.val,
"%Y-%m-%d %H:%M:%S%.6f",
)
.unwrap();
// NOTE: this is "table time" which can vary from the actual observation time,
// its the first time in entered the db in kvalobs
println!("TbTime: {:?} \n", tb_time);
if let Some(textdata) = tbtime.kvtextdata {
// TODO: Do we want to handle text data at all, it doesn't seem to be QCed
println!(
"station, typeid, textdata: {:?} {:?} {:?} \n",
station.val, typeid.val, textdata
);
}
for sensor in tbtime.sensor {
for level in sensor.level {
Expand All @@ -137,8 +190,16 @@ fn main() {
level: level.val,
};
println!("Timeseries ID: {:?} \n", tsid);
println!("data: {:?} \n", d);
// TODO: write into db
println!("Data: {:?} \n", d);
// Try to write into db
let _ = insert_kvdata(&client, tsid, obs_time, d)
.await
.map_err(|e| {
eprintln!(
"Writing to database error: {:?}",
e
)
});
}
}
}
Expand All @@ -153,14 +214,29 @@ fn main() {
println!("{:?}", result.err());
}
}
consumer.commit_consumed().unwrap();

// for debugging
i -= 1;

if i == 0 {
// exit loop
break;
}
consumer.commit_consumed().unwrap(); // ensure we keep offset
}

Ok(())
}

async fn insert_kvdata(
conn: &tokio_postgres::Client,
tsid: Tsid,
obstime: chrono::NaiveDateTime,
kvdata: Kvdata,
) -> Result<(), Error> {
// what timeseries is this?
let tsid: i64 = conn.query(
"SELECT timeseries FROM labels.met WHERE station_id = $1 AND param_id = $2 AND type_id = $3 AND lvl = $4 AND sensor = $5",
&[&tsid.station, &tsid.paramid, &tsid.typeid, &tsid.level, &tsid.sensor],
).await?.first().ok_or( Error::NoTs{station: tsid.station, param: tsid.paramid})?.get(0);

// write the data into the db
conn.execute(
"INSERT INTO flags.kvdata (timeseries, obstime, original, corrected, controlinfo, useinfo, cfailed) VALUES($1, $2, $3, $4, $5, $6, $7)",
&[&tsid, &obstime, &kvdata.original, &kvdata.corrected, &kvdata.controlinfo, &kvdata.useinfo, &kvdata.cfailed],
).await?;

Ok(())
}

0 comments on commit 9465f62

Please sign in to comment.