Skip to content

Commit

Permalink
address_store: Improve address tracking and add eviction algorithm (#250
Browse files Browse the repository at this point in the history
)

This PR improves the transport manager's address tracking to keep a
healthier view of addresses.

General changes:
- Fixes a bug where `listener` addresses were tracked instead of
`dialing` addresses (incoming connections may be established with
ephemeral ports and there's no guarantee the remote is listening on
them)
- `PeerIdMismatch` error coming from the noise handshake redirects the
address to the appropriate peer for healthier addresses
- Addresses are tracked first, regardless of the peer state to ensure we
update our view of the addresses (reachable or not)

Address Store changes:
- The store is bounded to a maximum of 64 tracked addresses
- Removing and reinsertion of addresses into the store is prone to error
and instead the address store updates addresses in place
- Introduces an eviction algorithm for tracking addresses with higher
score

### Testing Done
- added extra tests to the address store
- tested with subp2p-explorer for discovering kusama


This PR is part of a bigger refactor to keep track of healthier
addresses.
It is essentially a breakdown of
#248 for an easier review
process.

---------

Signed-off-by: Alexandru Vasile <[email protected]>
Co-authored-by: Dmitry Markin <[email protected]>
  • Loading branch information
lexnv and dmitry-markin authored Oct 25, 2024
1 parent e378e76 commit 2ef3751
Show file tree
Hide file tree
Showing 4 changed files with 338 additions and 160 deletions.
204 changes: 146 additions & 58 deletions src/transport/manager/address.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,29 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::{types::ConnectionId, PeerId};
use crate::{error::DialError, types::ConnectionId, PeerId};

use multiaddr::{Multiaddr, Protocol};
use multihash::Multihash;

use std::collections::{BinaryHeap, HashSet};
use std::collections::{hash_map::Entry, HashMap};

/// Maximum number of addresses tracked for a peer.
const MAX_ADDRESSES: usize = 64;

/// Scores for address records.
pub mod scores {
/// Score indicating that the connection was successfully established.
pub const CONNECTION_ESTABLISHED: i32 = 100i32;

/// Score for failing to connect due to an invalid or unreachable address.
pub const CONNECTION_FAILURE: i32 = -100i32;

/// Score for providing an invalid address.
///
/// This address can never be reached.
pub const ADDRESS_FAILURE: i32 = i32::MIN;
}

#[allow(clippy::derived_hash_with_manual_eq)]
#[derive(Debug, Clone, Hash)]
Expand Down Expand Up @@ -134,11 +151,10 @@ impl Ord for AddressRecord {
/// Store for peer addresses.
#[derive(Debug)]
pub struct AddressStore {
//// Addresses sorted by score.
pub by_score: BinaryHeap<AddressRecord>,

/// Addresses queryable by hashing them for faster lookup.
pub by_address: HashSet<Multiaddr>,
/// Addresses available.
pub addresses: HashMap<Multiaddr, AddressRecord>,
/// Maximum capacity of the address store.
max_capacity: usize,
}

impl FromIterator<Multiaddr> for AddressStore {
Expand All @@ -158,8 +174,7 @@ impl FromIterator<AddressRecord> for AddressStore {
fn from_iter<T: IntoIterator<Item = AddressRecord>>(iter: T) -> Self {
let mut store = AddressStore::new();
for record in iter {
store.by_address.insert(record.address.clone());
store.by_score.push(record);
store.insert(record);
}

store
Expand All @@ -186,52 +201,65 @@ impl AddressStore {
/// Create new [`AddressStore`].
pub fn new() -> Self {
Self {
by_score: BinaryHeap::new(),
by_address: HashSet::new(),
addresses: HashMap::with_capacity(MAX_ADDRESSES),
max_capacity: MAX_ADDRESSES,
}
}

/// Check if [`AddressStore`] is empty.
pub fn is_empty(&self) -> bool {
self.by_score.is_empty()
/// Get the score for a given error.
pub fn error_score(error: &DialError) -> i32 {
match error {
DialError::AddressError(_) => scores::ADDRESS_FAILURE,
_ => scores::CONNECTION_FAILURE,
}
}

/// Check if address is already in the a
pub fn contains(&self, address: &Multiaddr) -> bool {
self.by_address.contains(address)
/// Check if [`AddressStore`] is empty.
pub fn is_empty(&self) -> bool {
self.addresses.is_empty()
}

/// Insert new address record into [`AddressStore`] with default address score.
pub fn insert(&mut self, mut record: AddressRecord) {
if self.by_address.contains(record.address()) {
/// Insert the address record into [`AddressStore`] with the provided score.
///
/// If the address is not in the store, it will be inserted.
/// Otherwise, the score and connection ID will be updated.
pub fn insert(&mut self, record: AddressRecord) {
if let Entry::Occupied(mut occupied) = self.addresses.entry(record.address.clone()) {
occupied.get_mut().update_score(record.score);
return;
}

record.connection_id = None;
self.by_address.insert(record.address.clone());
self.by_score.push(record);
}

/// Pop address with the highest score from [`AddressStore`].
pub fn pop(&mut self) -> Option<AddressRecord> {
self.by_score.pop().map(|record| {
self.by_address.remove(&record.address);
record
})
}

/// Take at most `limit` `AddressRecord`s from [`AddressStore`].
pub fn take(&mut self, limit: usize) -> Vec<AddressRecord> {
let mut records = Vec::new();

for _ in 0..limit {
match self.pop() {
Some(record) => records.push(record),
None => break,
// The eviction algorithm favours addresses with higher scores.
//
// This algorithm has the following implications:
// - it keeps the best addresses in the store.
// - if the store is at capacity, the worst address will be evicted.
// - an address that is not dialed yet (with score zero) will be preferred over an address
// that already failed (with negative score).
if self.addresses.len() >= self.max_capacity {
let min_record = self
.addresses
.values()
.min()
.cloned()
.expect("There is at least one element checked above; qed");

// The lowest score is better than the new record.
if record.score < min_record.score {
return;
}
self.addresses.remove(min_record.address());
}

records
// Insert the record.
self.addresses.insert(record.address.clone(), record);
}

/// Return the available addresses sorted by score.
pub fn addresses(&self, limit: usize) -> Vec<Multiaddr> {
let mut records = self.addresses.values().cloned().collect::<Vec<_>>();
records.sort_by(|lhs, rhs| rhs.score.cmp(&lhs.score));
records.into_iter().take(limit).map(|record| record.address).collect()
}
}

Expand All @@ -256,7 +284,7 @@ mod tests {
),
rng.gen_range(1..=65535),
));
let score: i32 = rng.gen();
let score: i32 = rng.gen_range(10..=200);

AddressRecord::new(
&peer,
Expand All @@ -279,7 +307,7 @@ mod tests {
),
rng.gen_range(1..=65535),
));
let score: i32 = rng.gen();
let score: i32 = rng.gen_range(10..=200);

AddressRecord::new(
&peer,
Expand All @@ -303,7 +331,7 @@ mod tests {
),
rng.gen_range(1..=65535),
));
let score: i32 = rng.gen();
let score: i32 = rng.gen_range(10..=200);

AddressRecord::new(
&peer,
Expand Down Expand Up @@ -331,19 +359,22 @@ mod tests {
store.insert(quic_address_record(&mut rng));
}

let known_addresses = store.by_address.len();
let known_addresses = store.addresses.len();
assert!(known_addresses >= 3);

let taken = store.take(known_addresses - 2);
let taken = store.addresses(known_addresses - 2);
assert_eq!(known_addresses - 2, taken.len());
assert!(!store.is_empty());

let mut prev: Option<AddressRecord> = None;
for record in taken {
assert!(!store.contains(record.address()));
for address in taken {
// Addresses are still in the store.
assert!(store.addresses.contains_key(&address));

let record = store.addresses.get(&address).unwrap().clone();

if let Some(previous) = prev {
assert!(previous.score > record.score);
assert!(previous.score >= record.score);
}

prev = Some(record);
Expand All @@ -359,18 +390,19 @@ mod tests {
store.insert(ws_address_record(&mut rng));
store.insert(quic_address_record(&mut rng));

assert_eq!(store.by_address.len(), 3);
assert_eq!(store.addresses.len(), 3);

let taken = store.take(8usize);
let taken = store.addresses(8usize);
assert_eq!(taken.len(), 3);
assert!(store.is_empty());

let mut prev: Option<AddressRecord> = None;
for record in taken {
let record = store.addresses.get(&record).unwrap().clone();

if prev.is_none() {
prev = Some(record);
} else {
assert!(prev.unwrap().score > record.score);
assert!(prev.unwrap().score >= record.score);
prev = Some(record);
}
}
Expand Down Expand Up @@ -401,10 +433,9 @@ mod tests {
.collect::<HashMap<_, _>>();
store.extend(records);

for record in store.by_score {
for record in store.addresses.values().cloned() {
let stored = cloned.get(record.address()).unwrap();
assert_eq!(stored.score(), record.score());
assert_eq!(stored.connection_id(), record.connection_id());
assert_eq!(stored.address(), record.address());
}
}
Expand Down Expand Up @@ -433,11 +464,68 @@ mod tests {
let cloned = records.iter().cloned().collect::<HashMap<_, _>>();
store.extend(records.iter().map(|(_, record)| record));

for record in store.by_score {
for record in store.addresses.values().cloned() {
let stored = cloned.get(record.address()).unwrap();
assert_eq!(stored.score(), record.score());
assert_eq!(stored.connection_id(), record.connection_id());
assert_eq!(stored.address(), record.address());
}
}

#[test]
fn insert_record() {
let mut store = AddressStore::new();
let mut rng = rand::thread_rng();

let mut record = tcp_address_record(&mut rng);
record.score = 10;

store.insert(record.clone());

assert_eq!(store.addresses.len(), 1);
assert_eq!(store.addresses.get(record.address()).unwrap(), &record);

// This time the record is updated.
store.insert(record.clone());

assert_eq!(store.addresses.len(), 1);
let store_record = store.addresses.get(record.address()).unwrap();
assert_eq!(store_record.score, record.score * 2);
}

#[test]
fn evict_on_capacity() {
let mut store = AddressStore {
addresses: HashMap::new(),
max_capacity: 2,
};

let mut rng = rand::thread_rng();
let mut first_record = tcp_address_record(&mut rng);
first_record.score = scores::CONNECTION_ESTABLISHED;
let mut second_record = ws_address_record(&mut rng);
second_record.score = 0;

store.insert(first_record.clone());
store.insert(second_record.clone());

assert_eq!(store.addresses.len(), 2);

// We have better addresses, ignore this one.
let mut third_record = quic_address_record(&mut rng);
third_record.score = scores::CONNECTION_FAILURE;
store.insert(third_record.clone());
assert_eq!(store.addresses.len(), 2);
assert!(store.addresses.contains_key(first_record.address()));
assert!(store.addresses.contains_key(second_record.address()));

// Evict the address with the lowest score.
// Store contains scores: [100, 0].
let mut fourth_record = quic_address_record(&mut rng);
fourth_record.score = 1;
store.insert(fourth_record.clone());

assert_eq!(store.addresses.len(), 2);
assert!(store.addresses.contains_key(first_record.address()));
assert!(store.addresses.contains_key(fourth_record.address()));
}
}
Loading

0 comments on commit 2ef3751

Please sign in to comment.