From e7c748b951b2c3e4dbb2f615ca5faa61acdea4bb Mon Sep 17 00:00:00 2001 From: adria0 Date: Sun, 5 Apr 2020 15:09:46 +0200 Subject: [PATCH] happy clippy --- Cargo.lock | 2 +- src/actors/ctrlc.rs | 4 +- src/actors/landiscovery.rs | 2 +- src/actors/mod.rs | 2 +- src/actors/muxrpc.rs | 23 ++++++----- src/actors/rpcs/base.rs | 20 ++++----- src/actors/rpcs/get.rs | 21 +++++----- src/actors/rpcs/history_stream.rs | 67 ++++++++++++++++++------------- src/actors/rpcs/mod.rs | 6 +-- src/actors/rpcs/whoami.rs | 23 ++++++----- src/actors/sensor.rs | 4 +- src/broker.rs | 16 ++++---- src/main.rs | 8 ++-- src/storage.rs | 26 ++++++++---- 14 files changed, 122 insertions(+), 102 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a489d97..daec475 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -666,7 +666,7 @@ dependencies = [ [[package]] name = "kuska-ssb" -version = "0.1.3" +version = "0.1.4" dependencies = [ "async-std", "base64 0.11.0", diff --git a/src/actors/ctrlc.rs b/src/actors/ctrlc.rs index cf4a703..2c7cdf8 100644 --- a/src/actors/ctrlc.rs +++ b/src/actors/ctrlc.rs @@ -5,11 +5,11 @@ use crate::broker::*; use crate::error::AnyResult; pub async fn actor() -> AnyResult<()> { - let mut broker = BROKER.lock().await.register("crtlc",false).await?; + let mut broker = BROKER.lock().await.register("crtlc", false).await?; let ctrlc = CtrlC::new().expect("cannot create Ctrl+C handler?"); ctrlc.await; println!("Got CTRL-C, sending termination signal to jobs..."); let _ = broker.ch_broker.send(BrokerEvent::Terminate).await; Ok(()) -} \ No newline at end of file +} diff --git a/src/actors/landiscovery.rs b/src/actors/landiscovery.rs index 71a7beb..d99eea5 100644 --- a/src/actors/landiscovery.rs +++ b/src/actors/landiscovery.rs @@ -36,7 +36,7 @@ pub async fn actor(server_pk: String, rpc_port: u16) -> AnyResult<()> { }; } } - let broker = BROKER.lock().await.register("landiscover",false).await?; + let broker = BROKER.lock().await.register("landiscover", false).await?; let broadcast_list = packets .iter() diff --git a/src/actors/mod.rs b/src/actors/mod.rs index 52abd7e..f73a98a 100644 --- a/src/actors/mod.rs +++ b/src/actors/mod.rs @@ -1,5 +1,5 @@ pub mod ctrlc; pub mod landiscovery; pub mod muxrpc; -pub mod sensor; pub mod rpcs; +pub mod sensor; diff --git a/src/actors/muxrpc.rs b/src/actors/muxrpc.rs index 616190e..8c5e633 100644 --- a/src/actors/muxrpc.rs +++ b/src/actors/muxrpc.rs @@ -14,14 +14,14 @@ use kuska_ssb::{ rpc::RpcStream, }; -use crate::storage::ChStoRecv; use crate::broker::*; use crate::error::AnyResult; +use crate::storage::ChStoRecv; -use super::rpcs::{RpcInput,RpcHandler,HistoryStreamHandler,GetHandler,WhoAmIHandler}; +use super::rpcs::{GetHandler, HistoryStreamHandler, RpcHandler, RpcInput, WhoAmIHandler}; pub async fn actor(server_id: OwnedIdentity, addr: impl ToSocketAddrs) -> AnyResult<()> { - let broker = BROKER.lock().await.register("sbot-listener",false).await?; + let broker = BROKER.lock().await.register("sbot-listener", false).await?; let mut ch_terminate = broker.ch_terminate.fuse(); @@ -71,15 +71,16 @@ async fn handle_connection(mut stream: TcpStream, server_id: OwnedIdentity) -> A actor_id, .. } = broker; - let res = sbot_loop( - ch_terminate,ch_storage.unwrap(), - &mut api, id, peer_ssb_id).await; + let res = sbot_loop(ch_terminate, ch_storage.unwrap(), &mut api, id, peer_ssb_id).await; if let Err(err) = res { warn!("client terminated with error {:?}", err); } - ch_broker.send(BrokerEvent::Disconnect { actor_id }).await.unwrap(); + ch_broker + .send(BrokerEvent::Disconnect { actor_id }) + .await + .unwrap(); Ok(()) } @@ -96,7 +97,7 @@ async fn sbot_loop>= vec![ + let mut handlers: Vec<&mut dyn RpcHandler> = vec![ &mut history_stream_handler, &mut whoami_handler, &mut get_handler, @@ -126,15 +127,15 @@ async fn sbot_loop : Send + Sync { +pub trait RpcHandler: + Send + Sync +{ async fn handle(&mut self, api: &mut ApiHelper, op: &RpcInput) -> AnyResult; } - diff --git a/src/actors/rpcs/get.rs b/src/actors/rpcs/get.rs index c7e2845..b5bcecb 100644 --- a/src/actors/rpcs/get.rs +++ b/src/actors/rpcs/get.rs @@ -1,27 +1,27 @@ -use async_std::{ - io::{Read, Write}, -}; +use async_std::io::{Read, Write}; +use async_trait::async_trait; use kuska_ssb::{ api::{ApiHelper, ApiMethod}, rpc::RecvMsg, }; -use async_trait::async_trait; use crate::error::AnyResult; use crate::storage::DB; -use super::{RpcHandler,RpcInput}; -pub struct GetHandler{} +use super::{RpcHandler, RpcInput}; +pub struct GetHandler {} impl Default for GetHandler { fn default() -> Self { - Self{} + Self {} } } #[async_trait] -impl RpcHandler for GetHandler { +impl RpcHandler + for GetHandler +{ async fn handle(&mut self, api: &mut ApiHelper, op: &RpcInput) -> AnyResult { match op { RpcInput::Network(req_no, RecvMsg::RpcRequest(req)) => { @@ -38,11 +38,10 @@ impl RpcHandler Ok(false) + _ => Ok(false), } } - _ => Ok(false) + _ => Ok(false), } } } - diff --git a/src/actors/rpcs/history_stream.rs b/src/actors/rpcs/history_stream.rs index f2ce009..e17d562 100644 --- a/src/actors/rpcs/history_stream.rs +++ b/src/actors/rpcs/history_stream.rs @@ -1,9 +1,7 @@ -use std::string::ToString; use std::collections::HashMap; +use std::string::ToString; -use async_std::{ - io::{Read, Write}, -}; +use async_std::io::{Read, Write}; use async_trait::async_trait; use kuska_ssb::{ @@ -15,28 +13,30 @@ use crate::error::AnyResult; use crate::storage::StorageEvent; use crate::storage::DB; -use super::{RpcHandler,RpcInput}; +use super::{RpcHandler, RpcInput}; struct HistoryStreamRequest { - req_no : i32, - args : CreateHistoryStreamArgs, - from : u64, + req_no: i32, + args: CreateHistoryStreamArgs, + from: u64, } pub struct HistoryStreamHandler { - reqs : HashMap + reqs: HashMap, } impl Default for HistoryStreamHandler { fn default() -> Self { Self { - reqs : HashMap::new() + reqs: HashMap::new(), } } } #[async_trait] -impl RpcHandler for HistoryStreamHandler { +impl RpcHandler + for HistoryStreamHandler +{ async fn handle(&mut self, api: &mut ApiHelper, op: &RpcInput) -> AnyResult { match op { RpcInput::Network(req_no, RecvMsg::RpcRequest(req)) => { @@ -49,9 +49,11 @@ impl RpcHandler let from = args.seq.unwrap_or(1u64); let mut req = HistoryStreamRequest { - args, from, req_no: *req_no + args, + from, + req_no: *req_no, }; - + self.send_history(api, &mut req).await?; if req.args.live.unwrap_or(false) { @@ -61,12 +63,16 @@ impl RpcHandler } Ok(true) } - _ => Ok(false) + _ => Ok(false), } } RpcInput::Network(req_no, RecvMsg::CancelStreamRespose()) => { - let key = self.reqs.iter().find(|(_,v)| v.req_no == *req_no).map(|(k,_)| k.clone()); + let key = self + .reqs + .iter() + .find(|(_, v)| v.req_no == *req_no) + .map(|(k, _)| k.clone()); if let Some(key) = key { api.rpc().send_stream_eof(-req_no).await?; self.reqs.remove(&key); @@ -77,7 +83,11 @@ impl RpcHandler } RpcInput::Network(req_no, RecvMsg::ErrorResponse(err)) => { - let key = self.reqs.iter().find(|(_,v)| v.req_no == *req_no).map(|(k,_)| k.clone()); + let key = self + .reqs + .iter() + .find(|(_, v)| v.req_no == *req_no) + .map(|(k, _)| k.clone()); if let Some(key) = key { warn!("error {}", err); self.reqs.remove(&key); @@ -90,34 +100,36 @@ impl RpcHandler RpcInput::Storage(StorageEvent::IdChanged(id)) => { if let Some(mut req) = self.reqs.remove(id) { self.send_history(api, &mut req).await?; - self.reqs.insert(id.clone(),req); + self.reqs.insert(id.clone(), req); Ok(true) } else { Ok(false) } } - _ => Ok(false) + _ => Ok(false), } } } impl HistoryStreamHandler { - async fn send_history( + async fn send_history( &mut self, api: &mut ApiHelper, - req : &mut HistoryStreamRequest - ) -> AnyResult<()> - { + req: &mut HistoryStreamRequest, + ) -> AnyResult<()> { let req_id = if req.args.id.starts_with('@') { req.args.id.clone() } else { format!("@{}", req.args.id).to_string() }; - + let last = DB.read().await.get_feed_len(&req_id)?.map_or(0, |x| x + 1); let with_keys = req.args.keys.unwrap_or(true); - - info!("Sending history stream of {} ({}..{})", req.args.id, req.from, last); + + info!( + "Sending history stream of {} ({}..{})", + req.args.id, req.from, last + ); for n in req.from..last { let data = DB.read().await.get_feed(&req_id, n - 1)?; let data = if with_keys { @@ -128,9 +140,8 @@ impl HistoryStreamHandler { info!(" - [with_keys={}]{}", with_keys, &data.to_string()); api.feed_res_send(req.req_no, &data).await?; } - + req.from = last; Ok(()) - } + } } - diff --git a/src/actors/rpcs/mod.rs b/src/actors/rpcs/mod.rs index 14598f1..c6cb867 100644 --- a/src/actors/rpcs/mod.rs +++ b/src/actors/rpcs/mod.rs @@ -1,9 +1,9 @@ +mod base; mod get; mod history_stream; mod whoami; -mod base; -pub use base::{RpcInput,RpcHandler}; +pub use base::{RpcHandler, RpcInput}; pub use get::GetHandler; pub use history_stream::HistoryStreamHandler; -pub use whoami::WhoAmIHandler; \ No newline at end of file +pub use whoami::WhoAmIHandler; diff --git a/src/actors/rpcs/whoami.rs b/src/actors/rpcs/whoami.rs index 4fe9ca7..7b242b9 100644 --- a/src/actors/rpcs/whoami.rs +++ b/src/actors/rpcs/whoami.rs @@ -1,6 +1,4 @@ -use async_std::{ - io::{Read, Write}, -}; +use async_std::io::{Read, Write}; use async_trait::async_trait; use kuska_ssb::{ @@ -8,33 +6,36 @@ use kuska_ssb::{ rpc::RecvMsg, }; +use super::{RpcHandler, RpcInput}; use crate::error::AnyResult; -use super::{RpcHandler,RpcInput}; pub struct WhoAmIHandler<'a> { - peer_ssb_id: &'a String + peer_ssb_id: &'a str, } impl<'a> WhoAmIHandler<'a> { - pub fn new(peer_ssb_id : &'a String) -> Self { + pub fn new(peer_ssb_id: &'a str) -> Self { Self { peer_ssb_id } } } #[async_trait] -impl<'a,R: Read + Unpin+ Send + Sync , W: Write + Unpin+ Send + Sync > RpcHandler for WhoAmIHandler<'a> { +impl<'a, R: Read + Unpin + Send + Sync, W: Write + Unpin + Send + Sync> RpcHandler + for WhoAmIHandler<'a> +{ async fn handle(&mut self, api: &mut ApiHelper, op: &RpcInput) -> AnyResult { match op { RpcInput::Network(req_no, RecvMsg::RpcRequest(req)) => { match ApiMethod::from_rpc_body(req) { Some(ApiMethod::WhoAmI) => { - api.whoami_res_send(*req_no, self.peer_ssb_id.clone()).await?; + api.whoami_res_send(*req_no, self.peer_ssb_id.to_string()) + .await?; Ok(true) } - _ => Ok(false) + _ => Ok(false), } } - _ => Ok(false) + _ => Ok(false), } } -} \ No newline at end of file +} diff --git a/src/actors/sensor.rs b/src/actors/sensor.rs index 8d3e90e..8474150 100644 --- a/src/actors/sensor.rs +++ b/src/actors/sensor.rs @@ -6,11 +6,11 @@ use futures::FutureExt; use kuska_ssb::{api::msgs::Post, feed::Message, keystore::OwnedIdentity}; use crate::broker::*; -use crate::storage::DB; use crate::error::AnyResult; +use crate::storage::DB; pub async fn actor(server_id: OwnedIdentity) -> AnyResult<()> { - let broker = BROKER.lock().await.register("sensor",false).await?; + let broker = BROKER.lock().await.register("sensor", false).await?; let mut ch_terminate = broker.ch_terminate.fuse(); loop { diff --git a/src/broker.rs b/src/broker.rs index 8528722..1c3c066 100644 --- a/src/broker.rs +++ b/src/broker.rs @@ -6,7 +6,7 @@ use once_cell::sync::Lazy; use std::collections::hash_map::HashMap; use crate::error::AnyResult; -use crate::storage::{StorageEvent,ChStoSend,ChStoRecv}; +use crate::storage::{ChStoRecv, ChStoSend, StorageEvent}; #[derive(Debug)] pub struct Void {} @@ -64,7 +64,7 @@ impl Broker { pub fn take_msgloop(&mut self) -> JoinHandle<()> { self.msgloop.take().unwrap() } - pub async fn register(&mut self, name: &str, storage_notify :bool) -> AnyResult { + pub async fn register(&mut self, name: &str, storage_notify: bool) -> AnyResult { self.last_actor_id += 1; info!("Registering actor {}={}", self.last_actor_id, name); @@ -73,10 +73,10 @@ impl Broker { let (terminated_sender, terminated_receiver) = oneshot::channel::(); let (sto_sender, sto_receiver) = if storage_notify { - let (s,r) = mpsc::unbounded::(); - (Some(s),Some(r)) + let (s, r) = mpsc::unbounded::(); + (Some(s), Some(r)) } else { - (None,None) + (None, None) }; let broker_endpoint = BrokerEndpoint { @@ -90,7 +90,7 @@ impl Broker { ch_broker: self.sender.clone(), ch_terminate: terminate_receiver, ch_terminated: terminated_sender, - ch_storage : sto_receiver, + ch_storage: sto_receiver, }; self.sender @@ -142,7 +142,7 @@ impl Broker { if let Some(ch) = &mut actor.ch_storage { let _ = ch.send(event.clone()).await; } - } + } } } } @@ -170,4 +170,4 @@ impl Broker { } drop(actors); } -} \ No newline at end of file +} diff --git a/src/main.rs b/src/main.rs index 34062db..13c5bbb 100644 --- a/src/main.rs +++ b/src/main.rs @@ -12,12 +12,12 @@ use kuska_ssb::keystore::{read_patchwork_config, write_patchwork_config, OwnedId mod actors; mod broker; -mod storage; mod error; +mod storage; use broker::*; -use storage::DB; use error::AnyResult; +use storage::DB; const LISTEN: &str = "0.0.0.0:8008"; const RPC_PORT: u16 = 8008; @@ -55,7 +55,9 @@ async fn main() -> AnyResult<()> { base64::encode(&server_id.pk[..]) ); - DB.write().await.open(&db_folder,BROKER.lock().await.create_sender())?; + DB.write() + .await + .open(&db_folder, BROKER.lock().await.create_sender())?; Broker::spawn(actors::ctrlc::actor()); Broker::spawn(actors::landiscovery::actor( diff --git a/src/storage.rs b/src/storage.rs index 18650e3..16bf458 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -1,11 +1,11 @@ use async_std::sync::{Arc, RwLock}; +use futures::SinkExt; use once_cell::sync::Lazy; use serde::{Deserialize, Serialize}; use serde_cbor; -use futures::SinkExt; +use crate::broker::{BrokerEvent, ChBrokerSend}; use kuska_ssb::feed::{Feed, Message}; -use crate::broker::{ChBrokerSend,BrokerEvent}; use futures::channel::mpsc; @@ -13,9 +13,9 @@ const PREFIX_LASTFEED: u8 = 0u8; const PREFIX_FEED: u8 = 1u8; const PREFIX_MESSAGE: u8 = 2u8; -#[derive(Debug,Clone)] +#[derive(Debug, Clone)] pub enum StorageEvent { - IdChanged(String) + IdChanged(String), } pub type ChStoRecv = mpsc::UnboundedReceiver; @@ -71,10 +71,13 @@ pub type Result = std::result::Result; impl Storage { pub fn default() -> Self { - Self { db: None, ch_broker : None} + Self { + db: None, + ch_broker: None, + } } - pub fn open(&mut self, path: &std::path::Path, ch_broker : ChBrokerSend) -> Result<()> { + pub fn open(&mut self, path: &std::path::Path, ch_broker: ChBrokerSend) -> Result<()> { self.db = Some(sled::open(path)?); self.ch_broker = Some(ch_broker); Ok(()) @@ -150,7 +153,14 @@ impl Storage { db.insert(Self::key_feed(&author, seq_no), feed.to_string().as_bytes())?; db.insert(Self::key_lastfeed(&author), &seq_no.to_be_bytes()[..])?; - self.ch_broker.as_ref().unwrap().send(BrokerEvent::Storage(StorageEvent::IdChanged(msg.author().clone()))).await.unwrap(); + self.ch_broker + .as_ref() + .unwrap() + .send(BrokerEvent::Storage(StorageEvent::IdChanged( + msg.author().clone(), + ))) + .await + .unwrap(); Ok(seq_no) } -} \ No newline at end of file +}