Skip to content

Commit

Permalink
Merge pull request #29 from sideprotocol/push-all-packages
Browse files Browse the repository at this point in the history
Refactor the heartbeat and signature aggregation mechanisms to tolerate network delays.
  • Loading branch information
lazyluis authored Nov 13, 2024
2 parents d21826a + 073cf63 commit fb1843a
Show file tree
Hide file tree
Showing 7 changed files with 203 additions and 154 deletions.
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "shuttler"
version = "0.3.0"
version = "0.3.1"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
Expand All @@ -25,8 +25,8 @@ libp2p = {version = "0.54.1", features = [ "tokio", "gossipsub", "identify", "md
tracing = "0.1.37"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
toml = "0.8.14"
frost-core = { git="https://github.com/liangping/frost.git", branch = "frost-secp256k1-tr", features=["serde", "serialization", "internals"] }
frost-secp256k1-tr = { git="https://github.com/liangping/frost.git", branch = "frost-secp256k1-tr", features=["serde", "serialization"] }
frost-core = { git="https://github.com/sideprotocol/frost.git", features=["serde", "serialization", "internals"] }
frost-secp256k1-tr = { git="https://github.com/sideprotocol/frost.git", features=["serde", "serialization"] }
k256 = { version = "0.13.0", features = ["arithmetic", "expose-field", "hash2curve"] }
rand_core = "0.6"
rand_chacha = "0.3.1"
Expand Down
33 changes: 20 additions & 13 deletions src/app/signer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use tokio::select;
use usize as Index;
use tracing::{debug, error, info, warn};

use ed25519_compact::SecretKey;
use ed25519_compact::{PublicKey, SecretKey, Signature};

use lazy_static::lazy_static;

Expand Down Expand Up @@ -318,6 +318,7 @@ impl Signer {
pub fn remove_signing_task(&self, task_id: &str) {
self.db_sign.remove(task_id).expect("Unable to remove task");
self.remove_signing_task_variables(task_id);
mem_store::remove_task_participants(task_id);
}

pub fn remove_signing_task_variables(&self, task_id: &str) {
Expand Down Expand Up @@ -379,11 +380,7 @@ pub async fn run_signer_daemon(conf: Config, seed: bool) {
let signer = Signer::new(conf.clone());

for (i, (addr, vkp) ) in signer.list_keypairs().iter().enumerate() {
debug!("Vault {i}. {addr}");
// maintain a permission white list for heartbeat
vkp.pub_key.verifying_shares().keys().for_each(|identifier| {
mem_store::update_alive_table(HeartBeatMessage { identifier: identifier.clone(), last_seen: 0 });
});
debug!("Vault {i}. {addr}, ({}-of-{})", vkp.priv_key.min_signers(), vkp.pub_key.verifying_shares().len());
}

let libp2p_keypair = Keypair::from_protobuf_encoding(from_base64(&conf.p2p_keypair).unwrap().as_slice()).unwrap();
Expand All @@ -410,7 +407,7 @@ pub async fn run_signer_daemon(conf: Config, seed: bool) {

// Set a custom gossipsub configuration
let gossipsub_config = gossipsub::ConfigBuilder::default()
.heartbeat_interval(Duration::from_secs(10)) // This is set to aid debugging by not cluttering the log space
.heartbeat_interval(Duration::from_secs(60)) // This is set to aid debugging by not cluttering the log space
.validation_mode(gossipsub::ValidationMode::Strict) // This sets the kind of message validation. The default is Strict (enforce message signing)
.message_id_fn(message_id_fn) // content-address messages. No two messages of the same content will be propagated.
.max_transmit_size(1000000)
Expand Down Expand Up @@ -464,12 +461,8 @@ pub async fn run_signer_daemon(conf: Config, seed: bool) {
},
SwarmEvent::ConnectionEstablished { peer_id, endpoint, ..} => {
swarm.behaviour_mut().gossip.add_explicit_peer(&peer_id);
let connected = swarm.connected_peers().map(|p| p.clone()).collect::<Vec<_>>();
if connected.len() > 0 {
swarm.behaviour_mut().identify.push(connected);
}
let addr = endpoint.get_remote_address();
info!("Connected to {:?}/p2p/{peer_id}, ", addr);
info!("Connected to {:?}, ", addr);
},
SwarmEvent::ConnectionClosed { peer_id, cause, .. } => {
info!("Disconnected {peer_id}: {:?}", cause);
Expand Down Expand Up @@ -524,7 +517,21 @@ async fn event_handler(event: TSSBehaviourEvent, swarm: &mut Swarm<TSSBehaviour>
}
} else if message.topic == SubscribeTopic::ALIVE.topic().hash() {
if let Ok(alive) = serde_json::from_slice::<HeartBeatMessage>(&message.data) {
mem_store::update_alive_table( alive );
// Ensure the message is not forged.
match PublicKey::from_slice(&alive.payload.identifier.serialize()) {
Ok(public_key) => {
let sig = Signature::from_slice(&alive.signature).unwrap();
let bytes = serde_json::to_vec(&alive.payload).unwrap();
if public_key.verify(bytes, &sig).is_err() {
debug!("Reject, untrusted package from {:?}", alive.payload.identifier);
return;
}
}
Err(_) => return
}
if mem_store::is_peer_trusted_peer(&alive.payload.identifier, signer) {
mem_store::update_alive_table( alive );
}
}
}
}
Expand Down
37 changes: 19 additions & 18 deletions src/helper/gossip.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@

use cosmos_sdk_proto::cosmos::base::tendermint::v1beta1::{service_client::ServiceClient as TendermintServiceClient, GetLatestBlockRequest};
use frost_core::{serde::{Deserialize, Serialize}, Identifier};
use frost_secp256k1_tr::Secp256K1Sha256;
use libp2p::{gossipsub::IdentTopic, Swarm};

use crate::{app::signer::Signer, protocols::{dkg::{self, prepare_response_for_task}, sign::SignMesage, TSSBehaviour}};

use super::{mem_store, now};
pub const HEART_BEAT_DURATION: tokio::time::Duration = tokio::time::Duration::from_secs(60);

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
Expand All @@ -23,8 +23,15 @@ impl SubscribeTopic {

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct HeartBeatMessage {
pub payload: HeartBeatPayload,
pub signature: Vec<u8>,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct HeartBeatPayload {
pub identifier: Identifier<Secp256K1Sha256>,
pub last_seen: u64,
pub task_ids: Vec<String>,
}

pub fn subscribe_gossip_topics(swarm: &mut Swarm<TSSBehaviour>) {
Expand Down Expand Up @@ -56,27 +63,21 @@ pub fn publish_signing_package(swarm: &mut Swarm<TSSBehaviour>, signer: &Signer,
}

pub async fn sending_heart_beat(swarm: &mut Swarm<TSSBehaviour>, signer: &Signer) {
let host = signer.config().side_chain.grpc.clone();
let mut base_client = match TendermintServiceClient::connect(host).await {
Ok(c) => c,
Err(_) => return,
};
if let Ok(res) = base_client.get_latest_block(GetLatestBlockRequest{}).await {
let response = res.get_ref();
let blocktime = response
.block.as_ref().unwrap()
.header.as_ref().unwrap()
.time.as_ref().unwrap();

// let mut last = mem_store::LastSendingTime.lock().unwrap();
// sending alive message
let alive = HeartBeatMessage {

let last_seen = now() + mem_store::ALIVE_WINDOW;
let task_ids = signer.list_signing_tasks().iter().map(|a| a.id.clone()).collect::<Vec<_>>();
let payload = HeartBeatPayload {
identifier: signer.identifier().clone(),
last_seen: blocktime.seconds as u64
last_seen,
task_ids,
};
let bytes = serde_json::to_vec(&payload).unwrap();
let signature = signer.identity_key.sign(bytes, None).to_vec();
let alive = HeartBeatMessage { payload, signature };
let message = serde_json::to_vec(&alive).unwrap();
publish_message(swarm, SubscribeTopic::ALIVE, message);
};

mem_store::update_alive_table(alive);
}

fn publish_message(swarm: &mut Swarm<TSSBehaviour>, topic: SubscribeTopic, message: Vec<u8>) {
Expand Down
77 changes: 68 additions & 9 deletions src/helper/mem_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use frost_secp256k1_tr::{keys::dkg, Identifier};
use std::sync::Mutex;
use lazy_static::lazy_static;

use crate::app::config::TASK_INTERVAL;
use crate::app::{config::TASK_INTERVAL, signer::Signer};

use super::{gossip::HeartBeatMessage, now};

Expand All @@ -23,28 +23,87 @@ lazy_static! {
pub static ref AliveTable: Mutex<BTreeMap<Identifier, u64>> = {
Mutex::new(BTreeMap::new())
};
pub static ref TrustedPeers: Mutex<Vec<Identifier>> = {
Mutex::new(Vec::new())
};
pub static ref TaskParticipants: Mutex<BTreeMap<String,Vec<Identifier>>> = {
Mutex::new(BTreeMap::new())
};
}

pub const ALIVE_WINDOW: u64 = TASK_INTERVAL.as_secs() * 2;

pub fn update_alive_table(alive: HeartBeatMessage) {
let mut table= AliveTable.lock().unwrap();
table.insert(alive.identifier, alive.last_seen);
table.retain(|_, v| {*v + 1800u64 > now()});
if alive.payload.last_seen > now() {
let mut table= AliveTable.lock().unwrap();
table.insert(alive.payload.identifier, alive.payload.last_seen);
table.retain(|_, v| {*v + 1800u64 > now()});

let mut tp = TaskParticipants.lock().unwrap();
alive.payload.task_ids.iter().for_each(|id| {
match tp.get_mut(id.as_str()) {
Some(t) => {
if !t.contains(&alive.payload.identifier) {
t.push(alive.payload.identifier.clone());
}
},
None => {
tp.insert(id.clone(), vec![alive.payload.identifier.clone()]);
}
};
});
}
}

pub fn remove_task_participants(task_id: &str) {
let mut tp = TaskParticipants.lock().unwrap();
tp.remove(task_id);
}

pub fn get_alive_participants(keys: &Vec<&Identifier>) -> usize {
pub fn count_task_participants(task_id: &str) -> Vec<Identifier> {

let tp = TaskParticipants.lock().unwrap();
let table= AliveTable.lock().unwrap();
match tp.get(task_id) {
Some(participants) => participants.iter().filter(|i| {
let last_seen = table.get(i).unwrap_or(&0);
*last_seen > now()
}).map(|i| i.clone()).collect::<Vec<_>>(),
None => vec![],
}
}

pub fn count_alive_participants(keys: &Vec<&Identifier>) -> usize {
let table= AliveTable.lock().unwrap();

let alive = keys.iter().filter(|key| {
let last_seen = table.get(key).unwrap_or(&0u64);
now() - last_seen < TASK_INTERVAL.as_secs() * 2
}).count() + 1;
now() < *last_seen
}).count();
// debug!("alive table: {alive}, {:?}", table);
alive
}

pub fn is_white_listed_peer(identifier: &Identifier) -> bool {
pub fn is_peer_alive(identifier: &Identifier) -> bool {
let table= AliveTable.lock().unwrap();
table.contains_key(identifier)
let last_seen = table.get(identifier).unwrap_or(&0u64);
now() < *last_seen
}

pub fn is_peer_trusted_peer(identifier: &Identifier, signer: &Signer) -> bool {
let mut table= TrustedPeers.lock().unwrap();
if table.contains(identifier) {
true
} else {
signer.list_keypairs().iter().any(|(_, kp)| {
if kp.pub_key.verifying_shares().contains_key(identifier) {
table.push(identifier.clone());
true
} else {
false
}
})
}
}

pub fn get_dkg_round1_secret_packet(task_id: &str) -> Option<dkg::round1::SecretPackage> {
Expand Down
Loading

0 comments on commit fb1843a

Please sign in to comment.