Skip to content

Commit

Permalink
happy clippy
Browse files Browse the repository at this point in the history
  • Loading branch information
adria0 authored and adria0 committed Apr 5, 2020
1 parent 919c694 commit e7c748b
Show file tree
Hide file tree
Showing 14 changed files with 122 additions and 102 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions src/actors/ctrlc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ use crate::broker::*;
use crate::error::AnyResult;

pub async fn actor() -> AnyResult<()> {
let mut broker = BROKER.lock().await.register("crtlc",false).await?;
let mut broker = BROKER.lock().await.register("crtlc", false).await?;

let ctrlc = CtrlC::new().expect("cannot create Ctrl+C handler?");
ctrlc.await;
println!("Got CTRL-C, sending termination signal to jobs...");
let _ = broker.ch_broker.send(BrokerEvent::Terminate).await;
Ok(())
}
}
2 changes: 1 addition & 1 deletion src/actors/landiscovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub async fn actor(server_pk: String, rpc_port: u16) -> AnyResult<()> {
};
}
}
let broker = BROKER.lock().await.register("landiscover",false).await?;
let broker = BROKER.lock().await.register("landiscover", false).await?;

let broadcast_list = packets
.iter()
Expand Down
2 changes: 1 addition & 1 deletion src/actors/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
pub mod ctrlc;
pub mod landiscovery;
pub mod muxrpc;
pub mod sensor;
pub mod rpcs;
pub mod sensor;
23 changes: 12 additions & 11 deletions src/actors/muxrpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@ use kuska_ssb::{
rpc::RpcStream,
};

use crate::storage::ChStoRecv;
use crate::broker::*;
use crate::error::AnyResult;
use crate::storage::ChStoRecv;

use super::rpcs::{RpcInput,RpcHandler,HistoryStreamHandler,GetHandler,WhoAmIHandler};
use super::rpcs::{GetHandler, HistoryStreamHandler, RpcHandler, RpcInput, WhoAmIHandler};

pub async fn actor(server_id: OwnedIdentity, addr: impl ToSocketAddrs) -> AnyResult<()> {
let broker = BROKER.lock().await.register("sbot-listener",false).await?;
let broker = BROKER.lock().await.register("sbot-listener", false).await?;

let mut ch_terminate = broker.ch_terminate.fuse();

Expand Down Expand Up @@ -71,15 +71,16 @@ async fn handle_connection(mut stream: TcpStream, server_id: OwnedIdentity) -> A
actor_id,
..
} = broker;
let res = sbot_loop(
ch_terminate,ch_storage.unwrap(),
&mut api, id, peer_ssb_id).await;
let res = sbot_loop(ch_terminate, ch_storage.unwrap(), &mut api, id, peer_ssb_id).await;

if let Err(err) = res {
warn!("client terminated with error {:?}", err);
}

ch_broker.send(BrokerEvent::Disconnect { actor_id }).await.unwrap();
ch_broker
.send(BrokerEvent::Disconnect { actor_id })
.await
.unwrap();
Ok(())
}

Expand All @@ -96,7 +97,7 @@ async fn sbot_loop<R: Read + Unpin + Send + Sync, W: Write + Unpin + Send + Sync
let mut whoami_handler = WhoAmIHandler::new(&peer_ssb_id);
let mut get_handler = GetHandler::default();

let mut handlers : Vec<&mut dyn RpcHandler<R,W>>= vec![
let mut handlers: Vec<&mut dyn RpcHandler<R, W>> = vec![
&mut history_stream_handler,
&mut whoami_handler,
&mut get_handler,
Expand Down Expand Up @@ -126,15 +127,15 @@ async fn sbot_loop<R: Read + Unpin + Send + Sync, W: Write + Unpin + Send + Sync
};
let mut handled = false;
for handler in handlers.iter_mut() {
if handler.handle(api,&input).await? {
if handler.handle(api, &input).await? {
handled = true;
break;
}
}
if !handled {
trace!("message not processed: {:?}",input);
trace!("message not processed: {:?}", input);
}
}

Ok(())
}
}
20 changes: 8 additions & 12 deletions src/actors/rpcs/base.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,21 @@
use async_std::{
io::{Read, Write},
};
use async_std::io::{Read, Write};

use kuska_ssb::{
api::ApiHelper,
rpc::RecvMsg,
};
use async_trait::async_trait;
use kuska_ssb::{api::ApiHelper, rpc::RecvMsg};

use crate::storage::StorageEvent;
use crate::error::AnyResult;
use crate::storage::StorageEvent;

#[derive(Debug)]
pub enum RpcInput {
None,
Network(i32,RecvMsg),
Storage(StorageEvent)
Network(i32, RecvMsg),
Storage(StorageEvent),
}

#[async_trait]
pub trait RpcHandler<R: Read + Unpin + Send + Sync, W: Write + Unpin + Send + Sync> : Send + Sync {
pub trait RpcHandler<R: Read + Unpin + Send + Sync, W: Write + Unpin + Send + Sync>:
Send + Sync
{
async fn handle(&mut self, api: &mut ApiHelper<R, W>, op: &RpcInput) -> AnyResult<bool>;
}

21 changes: 10 additions & 11 deletions src/actors/rpcs/get.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,27 @@
use async_std::{
io::{Read, Write},
};
use async_std::io::{Read, Write};

use async_trait::async_trait;
use kuska_ssb::{
api::{ApiHelper, ApiMethod},
rpc::RecvMsg,
};
use async_trait::async_trait;

use crate::error::AnyResult;
use crate::storage::DB;

use super::{RpcHandler,RpcInput};
pub struct GetHandler{}
use super::{RpcHandler, RpcInput};
pub struct GetHandler {}

impl Default for GetHandler {
fn default() -> Self {
Self{}
Self {}
}
}

#[async_trait]
impl<R: Read + Unpin + Send + Sync, W: Write + Unpin + Send + Sync> RpcHandler<R,W> for GetHandler {
impl<R: Read + Unpin + Send + Sync, W: Write + Unpin + Send + Sync> RpcHandler<R, W>
for GetHandler
{
async fn handle(&mut self, api: &mut ApiHelper<R, W>, op: &RpcInput) -> AnyResult<bool> {
match op {
RpcInput::Network(req_no, RecvMsg::RpcRequest(req)) => {
Expand All @@ -38,11 +38,10 @@ impl<R: Read + Unpin + Send + Sync, W: Write + Unpin + Send + Sync> RpcHandler<R
};
Ok(true)
}
_ => Ok(false)
_ => Ok(false),
}
}
_ => Ok(false)
_ => Ok(false),
}
}
}

67 changes: 39 additions & 28 deletions src/actors/rpcs/history_stream.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
use std::string::ToString;
use std::collections::HashMap;
use std::string::ToString;

use async_std::{
io::{Read, Write},
};
use async_std::io::{Read, Write};
use async_trait::async_trait;

use kuska_ssb::{
Expand All @@ -15,28 +13,30 @@ use crate::error::AnyResult;
use crate::storage::StorageEvent;
use crate::storage::DB;

use super::{RpcHandler,RpcInput};
use super::{RpcHandler, RpcInput};

struct HistoryStreamRequest {
req_no : i32,
args : CreateHistoryStreamArgs,
from : u64,
req_no: i32,
args: CreateHistoryStreamArgs,
from: u64,
}

pub struct HistoryStreamHandler {
reqs : HashMap<String,HistoryStreamRequest>
reqs: HashMap<String, HistoryStreamRequest>,
}

impl Default for HistoryStreamHandler {
fn default() -> Self {
Self {
reqs : HashMap::new()
reqs: HashMap::new(),
}
}
}

#[async_trait]
impl<R: Read + Unpin + Send + Sync , W: Write + Unpin + Send + Sync > RpcHandler<R,W> for HistoryStreamHandler {
impl<R: Read + Unpin + Send + Sync, W: Write + Unpin + Send + Sync> RpcHandler<R, W>
for HistoryStreamHandler
{
async fn handle(&mut self, api: &mut ApiHelper<R, W>, op: &RpcInput) -> AnyResult<bool> {
match op {
RpcInput::Network(req_no, RecvMsg::RpcRequest(req)) => {
Expand All @@ -49,9 +49,11 @@ impl<R: Read + Unpin + Send + Sync , W: Write + Unpin + Send + Sync > RpcHandler
let from = args.seq.unwrap_or(1u64);

let mut req = HistoryStreamRequest {
args, from, req_no: *req_no
args,
from,
req_no: *req_no,
};

self.send_history(api, &mut req).await?;

if req.args.live.unwrap_or(false) {
Expand All @@ -61,12 +63,16 @@ impl<R: Read + Unpin + Send + Sync , W: Write + Unpin + Send + Sync > RpcHandler
}
Ok(true)
}
_ => Ok(false)
_ => Ok(false),
}
}

RpcInput::Network(req_no, RecvMsg::CancelStreamRespose()) => {
let key = self.reqs.iter().find(|(_,v)| v.req_no == *req_no).map(|(k,_)| k.clone());
let key = self
.reqs
.iter()
.find(|(_, v)| v.req_no == *req_no)
.map(|(k, _)| k.clone());
if let Some(key) = key {
api.rpc().send_stream_eof(-req_no).await?;
self.reqs.remove(&key);
Expand All @@ -77,7 +83,11 @@ impl<R: Read + Unpin + Send + Sync , W: Write + Unpin + Send + Sync > RpcHandler
}

RpcInput::Network(req_no, RecvMsg::ErrorResponse(err)) => {
let key = self.reqs.iter().find(|(_,v)| v.req_no == *req_no).map(|(k,_)| k.clone());
let key = self
.reqs
.iter()
.find(|(_, v)| v.req_no == *req_no)
.map(|(k, _)| k.clone());
if let Some(key) = key {
warn!("error {}", err);
self.reqs.remove(&key);
Expand All @@ -90,34 +100,36 @@ impl<R: Read + Unpin + Send + Sync , W: Write + Unpin + Send + Sync > RpcHandler
RpcInput::Storage(StorageEvent::IdChanged(id)) => {
if let Some(mut req) = self.reqs.remove(id) {
self.send_history(api, &mut req).await?;
self.reqs.insert(id.clone(),req);
self.reqs.insert(id.clone(), req);
Ok(true)
} else {
Ok(false)
}
}
_ => Ok(false)
_ => Ok(false),
}
}
}

impl HistoryStreamHandler {
async fn send_history<R: Read + Unpin+ Send + Sync , W: Write + Unpin+ Send + Sync >(
async fn send_history<R: Read + Unpin + Send + Sync, W: Write + Unpin + Send + Sync>(
&mut self,
api: &mut ApiHelper<R, W>,
req : &mut HistoryStreamRequest
) -> AnyResult<()>
{
req: &mut HistoryStreamRequest,
) -> AnyResult<()> {
let req_id = if req.args.id.starts_with('@') {
req.args.id.clone()
} else {
format!("@{}", req.args.id).to_string()
};

let last = DB.read().await.get_feed_len(&req_id)?.map_or(0, |x| x + 1);
let with_keys = req.args.keys.unwrap_or(true);

info!("Sending history stream of {} ({}..{})", req.args.id, req.from, last);

info!(
"Sending history stream of {} ({}..{})",
req.args.id, req.from, last
);
for n in req.from..last {
let data = DB.read().await.get_feed(&req_id, n - 1)?;
let data = if with_keys {
Expand All @@ -128,9 +140,8 @@ impl HistoryStreamHandler {
info!(" - [with_keys={}]{}", with_keys, &data.to_string());
api.feed_res_send(req.req_no, &data).await?;
}

req.from = last;
Ok(())
}
}
}

6 changes: 3 additions & 3 deletions src/actors/rpcs/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
mod base;
mod get;
mod history_stream;
mod whoami;
mod base;

pub use base::{RpcInput,RpcHandler};
pub use base::{RpcHandler, RpcInput};
pub use get::GetHandler;
pub use history_stream::HistoryStreamHandler;
pub use whoami::WhoAmIHandler;
pub use whoami::WhoAmIHandler;
Loading

0 comments on commit e7c748b

Please sign in to comment.