Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/lexnv/fix-crypto-noise' into lex…
Browse files Browse the repository at this point in the history
…nv/fix-crypto-noise
  • Loading branch information
lexnv committed Oct 30, 2024
2 parents d2583a8 + 38b78c6 commit d6667ed
Show file tree
Hide file tree
Showing 8 changed files with 263 additions and 55 deletions.
4 changes: 2 additions & 2 deletions src/protocol/libp2p/kademlia/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,12 +254,12 @@ pub enum KademliaEvent {
/// The type of the DHT records.
#[derive(Debug, Clone)]
pub enum RecordsType {
/// Record was found in the local store.
/// Record was found in the local store and [`Quorum::One`] was used.
///
/// This contains only a single result.
LocalStore(Record),

/// Records found in the network.
/// Records found in the network. This can include the locally found record.
Network(Vec<PeerRecord>),
}

Expand Down
2 changes: 1 addition & 1 deletion src/protocol/libp2p/kademlia/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1118,7 +1118,7 @@ impl Kademlia {
.closest(&Key::new(key), self.replication_factor)
.into(),
quorum,
if record.is_some() { 1 } else { 0 },
record.cloned(),
);
}
}
Expand Down
16 changes: 10 additions & 6 deletions src/protocol/libp2p/kademlia/query/get_record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,11 @@ pub struct GetRecordContext {

impl GetRecordContext {
/// Create new [`GetRecordContext`].
pub fn new(config: GetRecordConfig, in_peers: VecDeque<KademliaPeer>) -> Self {
pub fn new(
config: GetRecordConfig,
in_peers: VecDeque<KademliaPeer>,
found_records: Vec<PeerRecord>,
) -> Self {
let mut candidates = BTreeMap::new();

for candidate in &in_peers {
Expand All @@ -123,7 +127,7 @@ impl GetRecordContext {
candidates,
pending: HashMap::new(),
queried: HashSet::new(),
found_records: Vec::new(),
found_records,
}
}

Expand Down Expand Up @@ -378,7 +382,7 @@ mod tests {
#[test]
fn completes_when_no_candidates() {
let config = default_config();
let mut context = GetRecordContext::new(config, VecDeque::new());
let mut context = GetRecordContext::new(config, VecDeque::new(), Vec::new());
assert!(context.is_done());
let event = context.next_action().unwrap();
assert_eq!(event, QueryAction::QueryFailed { query: QueryId(0) });
Expand All @@ -387,7 +391,7 @@ mod tests {
known_records: 1,
..default_config()
};
let mut context = GetRecordContext::new(config, VecDeque::new());
let mut context = GetRecordContext::new(config, VecDeque::new(), Vec::new());
assert!(context.is_done());
let event = context.next_action().unwrap();
assert_eq!(event, QueryAction::QuerySucceeded { query: QueryId(0) });
Expand All @@ -405,7 +409,7 @@ mod tests {
assert_eq!(in_peers_set.len(), 3);

let in_peers = in_peers_set.iter().map(|peer| peer_to_kad(*peer)).collect();
let mut context = GetRecordContext::new(config, in_peers);
let mut context = GetRecordContext::new(config, in_peers, Vec::new());

for num in 0..3 {
let event = context.next_action().unwrap();
Expand Down Expand Up @@ -444,7 +448,7 @@ mod tests {
assert_eq!(in_peers_set.len(), 3);

let in_peers = [peer_a, peer_b, peer_c].iter().map(|peer| peer_to_kad(*peer)).collect();
let mut context = GetRecordContext::new(config, in_peers);
let mut context = GetRecordContext::new(config, in_peers, Vec::new());

// Schedule peer queries.
for num in 0..3 {
Expand Down
16 changes: 12 additions & 4 deletions src/protocol/libp2p/kademlia/query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ impl QueryEngine {
target: RecordKey,
candidates: VecDeque<KademliaPeer>,
quorum: Quorum,
count: usize,
local_record: Option<Record>,
) -> QueryId {
tracing::debug!(
target: LOG_TARGET,
Expand All @@ -331,18 +331,26 @@ impl QueryEngine {
let target = Key::new(target);
let config = GetRecordConfig {
local_peer_id: self.local_peer_id,
known_records: count,
known_records: if local_record.is_some() { 1 } else { 0 },
quorum,
replication_factor: self.replication_factor,
parallelism_factor: self.parallelism_factor,
query: query_id,
target,
};

let found_records = local_record
.into_iter()
.map(|record| PeerRecord {
peer: self.local_peer_id,
record,
})
.collect();

self.queries.insert(
query_id,
QueryType::GetRecord {
context: GetRecordContext::new(config, candidates),
context: GetRecordContext::new(config, candidates, found_records),
},
);

Expand Down Expand Up @@ -883,7 +891,7 @@ mod tests {
]
.into(),
Quorum::All,
3,
None,
);

for _ in 0..4 {
Expand Down
60 changes: 46 additions & 14 deletions src/transport/quic/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use multiaddr::{Multiaddr, Protocol};
use quinn::{ClientConfig, Connecting, Connection, Endpoint, IdleTimeout};

use std::{
collections::{HashMap, HashSet},
collections::HashMap,
net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
pin::Pin,
sync::Arc,
Expand Down Expand Up @@ -120,9 +120,9 @@ pub(crate) struct QuicTransport {
/// Opened raw connection, waiting for approval/rejection from `TransportManager`.
opened_raw: HashMap<ConnectionId, (NegotiatedConnection, Multiaddr)>,

/// Canceled raw connections.
canceled: HashSet<ConnectionId>,

/// Cancel raw connections futures.
///
/// This is cancelling `Self::pending_raw_connections`.
cancel_futures: HashMap<ConnectionId, AbortHandle>,
}

Expand Down Expand Up @@ -235,7 +235,6 @@ impl TransportBuilder for QuicTransport {
context,
config,
listener,
canceled: HashSet::new(),
opened_raw: HashMap::new(),
pending_open: HashMap::new(),
pending_dials: HashMap::new(),
Expand Down Expand Up @@ -477,8 +476,11 @@ impl Transport for QuicTransport {

/// Cancel opening connections.
fn cancel(&mut self, connection_id: ConnectionId) {
self.canceled.insert(connection_id);
self.cancel_futures.remove(&connection_id).map(|handle| handle.abort());
// Cancel the future if it exists.
// State clean-up happens inside the `poll_next`.
if let Some(handle) = self.cancel_futures.get(&connection_id) {
handle.abort();
}
}
}

Expand Down Expand Up @@ -510,27 +512,57 @@ impl Stream for QuicTransport {
connection_id,
address,
stream,
} =>
if !self.canceled.remove(&connection_id) {
} => {
let Some(handle) = self.cancel_futures.remove(&connection_id) else {
tracing::warn!(
target: LOG_TARGET,
?connection_id,
?address,
"raw connection without a cancel handle",
);
continue;
};

if !handle.is_aborted() {
self.opened_raw.insert(connection_id, (stream, address.clone()));

return Poll::Ready(Some(TransportEvent::ConnectionOpened {
connection_id,
address,
}));
},
}
}

RawConnectionResult::Failed {
connection_id,
errors,
} =>
if !self.canceled.remove(&connection_id) {
} => {
let Some(handle) = self.cancel_futures.remove(&connection_id) else {
tracing::warn!(
target: LOG_TARGET,
?connection_id,
?errors,
"raw connection without a cancel handle",
);
continue;
};

if !handle.is_aborted() {
return Poll::Ready(Some(TransportEvent::OpenFailure {
connection_id,
errors,
}));
},
}
}

RawConnectionResult::Canceled { connection_id } => {
self.canceled.remove(&connection_id);
if self.cancel_futures.remove(&connection_id).is_none() {
tracing::warn!(
target: LOG_TARGET,
?connection_id,
"raw cancelled connection without a cancel handle",
);
}
}
}
}
Expand Down
59 changes: 45 additions & 14 deletions src/transport/tcp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use socket2::{Domain, Socket, Type};
use tokio::net::TcpStream;

use std::{
collections::{HashMap, HashSet},
collections::HashMap,
net::SocketAddr,
pin::Pin,
task::{Context, Poll},
Expand Down Expand Up @@ -121,9 +121,9 @@ pub(crate) struct TcpTransport {
/// Opened raw connection, waiting for approval/rejection from `TransportManager`.
opened_raw: HashMap<ConnectionId, (TcpStream, Multiaddr)>,

/// Canceled raw connections.
canceled: HashSet<ConnectionId>,

/// Cancel raw connections futures.
///
/// This is cancelling `Self::pending_raw_connections`.
cancel_futures: HashMap<ConnectionId, AbortHandle>,

/// Connections which have been opened and negotiated but are being validated by the
Expand Down Expand Up @@ -291,7 +291,6 @@ impl TransportBuilder for TcpTransport {
config,
context,
dial_addresses,
canceled: HashSet::new(),
opened_raw: HashMap::new(),
pending_open: HashMap::new(),
pending_dials: HashMap::new(),
Expand Down Expand Up @@ -516,8 +515,11 @@ impl Transport for TcpTransport {
}

fn cancel(&mut self, connection_id: ConnectionId) {
self.canceled.insert(connection_id);
self.cancel_futures.remove(&connection_id).map(|handle| handle.abort());
// Cancel the future if it exists.
// State clean-up happens inside the `poll_next`.
if let Some(handle) = self.cancel_futures.get(&connection_id) {
handle.abort();
}
}
}

Expand Down Expand Up @@ -560,27 +562,56 @@ impl Stream for TcpTransport {
connection_id,
address,
stream,
} =>
if !self.canceled.remove(&connection_id) {
} => {
let Some(handle) = self.cancel_futures.remove(&connection_id) else {
tracing::warn!(
target: LOG_TARGET,
?connection_id,
?address,
"raw connection without a cancel handle",
);
continue;
};

if !handle.is_aborted() {
self.opened_raw.insert(connection_id, (stream, address.clone()));

return Poll::Ready(Some(TransportEvent::ConnectionOpened {
connection_id,
address,
}));
},
}
}

RawConnectionResult::Failed {
connection_id,
errors,
} =>
if !self.canceled.remove(&connection_id) {
} => {
let Some(handle) = self.cancel_futures.remove(&connection_id) else {
tracing::warn!(
target: LOG_TARGET,
?connection_id,
?errors,
"raw connection without a cancel handle",
);
continue;
};

if !handle.is_aborted() {
return Poll::Ready(Some(TransportEvent::OpenFailure {
connection_id,
errors,
}));
},
}
}
RawConnectionResult::Canceled { connection_id } => {
self.canceled.remove(&connection_id);
if self.cancel_futures.remove(&connection_id).is_none() {
tracing::warn!(
target: LOG_TARGET,
?connection_id,
"raw cancelled connection without a cancel handle",
);
}
}
}
}
Expand Down
Loading

0 comments on commit d6667ed

Please sign in to comment.