Skip to content

Commit

Permalink
chore: faster hash feature
Browse files Browse the repository at this point in the history
Signed-off-by: iGxnon <[email protected]>
  • Loading branch information
iGxnon committed Dec 15, 2024
1 parent 03309ed commit 441343b
Show file tree
Hide file tree
Showing 22 changed files with 246 additions and 76 deletions.
4 changes: 0 additions & 4 deletions .cargo/config.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,2 @@
[target.'cfg(all())']
rustflags = ["--cfg", "tokio_unstable"]

[alias]
x = "run --package xtask --"
xtask = "run --package xtask --"
8 changes: 4 additions & 4 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,18 +43,18 @@ jobs:
- name: Sccache clear
run: sccache --zero-stats > /dev/null

- name: Run test with codecov
- name: Test with codecov
run: cargo llvm-cov --no-report nextest --all-features --features fastrace/enable

- name: Run example with codecov
- name: Test example with codecov
run: |
cargo llvm-cov --no-report run --example proxy
cargo llvm-cov --no-report run --example controller
cargo llvm-cov --no-report run --example tracing --features fastrace/enable
- name: Run bench with codecov
- name: Test bench with codecov
run: |
cargo llvm-cov --no-report --bench micro --features="micro-bench"
cargo llvm-cov --no-report --bench micro --all-features
- name: Generate codecov report
run: |
Expand Down
4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ lru = "0.12"
parking_lot = "0.12"
pin-project-lite = "0.2"
rand = "0.8"
rustc-hash = { version = "2.1.0", optional = true }
serde = { version = "1", features = ["derive"], optional = true }
thiserror = "1"
tokio = { version = "1", features = ["net", "rt"], optional = true }
Expand All @@ -39,12 +40,13 @@ tokio = { version = "1", features = ["full"] }
default = ["tokio-rt", "serde"]
tokio-rt = ["dep:tokio"]
serde = ["dep:serde"]
rustc-hash = ["dep:rustc-hash"] # enable faster hash if the network is trusted
micro-bench = [] # for benchmark, do not enable it in normal use

[[bench]]
name = "micro"
harness = false
required-features = ["micro-bench"]
required-features = ["micro-bench", "rustc-hash"]

[profile.bench]
opt-level = 3
Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ Yet another project rewritten in Rust.
- Support `Unreliable`, `Reliable` and `ReliableOrdered` packets.
- Support multiple order channels.
- Support `ACK`/`NACK` mechanism.
- Support message priority for unordered reliability.
- Full tracing:
- You can track a packet's span during deduplication, fragmentation, ...

Expand Down Expand Up @@ -84,6 +85,6 @@ let config = client::Config::new()
...
let (_, writer) = socket.connect_to(<addr>, config).await?;
tokio::pin!(writer);
writer.send(Message::new(Reliability::Reliable, 0, Bytes::from_static(b"Hello, Anyone there?")))
writer.send(Message::new(Bytes::from_static(b"Hello, Anyone there?")))
.await?;
```
5 changes: 2 additions & 3 deletions benches/micro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@ pub fn codec_benchmark(c: &mut Criterion) {
cnt: usize,
throughput: impl Fn(&BenchOpts) -> Throughput,
) {
let datagrams = repeat(Bytes::from_static(datagram)).take(cnt);
let opts = micro_bench::codec::BenchOpts {
datagrams: black_box(datagrams.collect()),
datagrams: repeat(Bytes::from_static(datagram)).take(cnt).collect(),
seed: 114514,
dup_ratio: 0.,
shuffle_ratio: 0.,
Expand All @@ -33,7 +32,7 @@ pub fn codec_benchmark(c: &mut Criterion) {
format!("decode_cnt-{cnt}_size-{}", datagram.len()),
|bencher| {
bencher.to_async(FuturesExecutor).iter_batched(
|| opts.gen_inputs(),
|| black_box(opts.gen_inputs()),
micro_bench::codec::run_bench,
BatchSize::SmallInput,
);
Expand Down
2 changes: 1 addition & 1 deletion examples/tracing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ fn display(spans: Vec<SpanRecord>) {
.map(|span| (span.span_id, span.clone()))
.collect();
let adjacency_lists: HashMap<TraceId, HashMap<SpanId, Vec<SpanId>>> = spans.iter().fold(
std::collections::HashMap::new(),
HashMap::new(),
|mut map,
SpanRecord {
trace_id,
Expand Down
6 changes: 2 additions & 4 deletions src/codec/decoder/ordered.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use std::collections::HashMap;
use std::pin::Pin;
use std::task::{ready, Context, Poll};

Expand All @@ -10,8 +9,7 @@ use pin_project_lite::pin_project;
use crate::errors::CodecError;
use crate::packet::connected::{self, Frame, FrameSet};
use crate::utils::u24;

const INITIAL_ORDERING_MAP_CAP: usize = 64;
use crate::HashMap;

struct Ordering {
map: HashMap<u24, FrameSet<Frame>>,
Expand All @@ -21,7 +19,7 @@ struct Ordering {
impl Default for Ordering {
fn default() -> Self {
Self {
map: HashMap::with_capacity(INITIAL_ORDERING_MAP_CAP),
map: HashMap::default(),
read: 0.into(),
}
}
Expand Down
24 changes: 16 additions & 8 deletions src/guard.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::{BinaryHeap, HashMap, VecDeque};
use std::collections::{BinaryHeap, VecDeque};
use std::net::SocketAddr;
use std::pin::Pin;
use std::task::{ready, Context, Poll};
Expand All @@ -14,16 +14,24 @@ use crate::link::SharedLink;
use crate::opts::FlushStrategy;
use crate::packet::connected::{self, AckOrNack, Frame, FrameSet, Frames, FramesRef, Record};
use crate::packet::{Packet, FRAME_SET_HEADER_SIZE};
use crate::utils::{u24, ConnId, Reactor};
use crate::{Peer, Priority, Role};
use crate::utils::{combine_hashes, u24, Reactor};
use crate::{HashMap, Peer, Priority, Role};

// A frame with penalty
#[derive(Debug, PartialEq, Eq)]
#[derive(Debug)]
struct PenaltyFrame {
penalty: u8,
frame: Frame,
}

impl PartialEq for PenaltyFrame {
fn eq(&self, other: &Self) -> bool {
self.penalty == other.penalty
}
}

impl Eq for PenaltyFrame {}

impl PartialOrd for PenaltyFrame {
fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
Some(self.cmp(other))
Expand Down Expand Up @@ -431,7 +439,7 @@ struct ResendMap {
impl ResendMap {
fn new(role: Role, peer: Peer, estimator: Box<dyn Estimator + Send + Sync + 'static>) -> Self {
Self {
map: HashMap::new(),
map: HashMap::default(),
role,
peer,
last_record_expired_at: Instant::now(),
Expand Down Expand Up @@ -593,15 +601,15 @@ impl ResendMap {
} else {
return Poll::Ready(());
}
let c_id = ConnId::new(self.role.guid(), self.peer.guid);
let key = combine_hashes(self.role.guid(), self.peer.guid);
trace!(
"[{}] wait on {c_id:?} for resend seq_num {} to {} within {:?}",
"[{}] wait on timer {key} for resend seq_num {} to {} within {:?}",
self.role,
seq_num,
self.peer,
expired_at - now
);
Reactor::get().insert_timer(c_id, expired_at, cx.waker());
Reactor::get().insert_timer(key, expired_at, cx.waker());
Poll::Pending
}
}
Expand Down
6 changes: 6 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ pub mod micro_bench {
}
}

#[cfg(feature = "rustc-hash")]
pub type HashMap<K, V, H = rustc_hash::FxBuildHasher> = std::collections::HashMap<K, V, H>;

#[cfg(not(feature = "rustc-hash"))]
pub type HashMap<K, V, H = std::hash::RandomState> = std::collections::HashMap<K, V, H>;

/// Unit tests
#[cfg(test)]
mod tests;
Expand Down
8 changes: 4 additions & 4 deletions src/link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use log::{debug, warn};

use crate::packet::connected::{self, AckOrNack, FrameBody, FrameSet, FramesMut};
use crate::packet::unconnected;
use crate::utils::{u24, ConnId, Reactor};
use crate::utils::{combine_hashes, u24, Reactor};
use crate::{Peer, Role};

/// Shared link between stream and sink
Expand Down Expand Up @@ -92,15 +92,15 @@ impl TransferLink {
}
// wake up after receiving an ack
if self.should_waking() {
let c_id = ConnId::new(self.role.guid(), self.peer.guid);
let key = combine_hashes(self.role.guid(), self.peer.guid);
let mut cnt = 0;
for waker in Reactor::get().cancel_all_timers(c_id) {
for waker in Reactor::get().cancel_all_timers(key) {
// safe to panic
waker.wake();
cnt += 1;
}
debug!(
"[{}] wake up {cnt} wakers after receives ack on connection: {c_id:?}",
"[{}] wake up {cnt} wakers after receives ack on timer {key}",
self.role
);
}
Expand Down
6 changes: 4 additions & 2 deletions src/packet/connected/ack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ use crate::errors::CodecError;
use crate::packet::read_buf;
use crate::utils::{u24, BufExt, BufMutExt};

#[derive(PartialEq, Clone)]
#[derive(Clone)]
#[cfg_attr(test, derive(PartialEq, Eq))]
pub(crate) struct AckOrNack {
pub(crate) records: Vec<Record>,
}
Expand Down Expand Up @@ -123,7 +124,8 @@ impl AckOrNack {
const RECORD_RANGE: u8 = 0;
const RECORD_SINGLE: u8 = 1;

#[derive(PartialEq, Clone)]
#[derive(Clone)]
#[cfg_attr(test, derive(PartialEq, Eq))]
pub(crate) enum Record {
Range(u24, u24),
Single(u24),
Expand Down
20 changes: 12 additions & 8 deletions src/packet/connected/frame_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ pub(crate) type FramesMut = Vec<Frame<BytesMut>>;

pub(crate) type FrameMut = Frame<BytesMut>;

#[derive(Debug, PartialEq, Eq, Clone)]
#[derive(Debug, Clone)]
#[cfg_attr(test, derive(PartialEq, Eq))]
pub(crate) struct FrameSet<S> {
pub(crate) seq_num: u24,
pub(crate) set: S,
Expand Down Expand Up @@ -51,7 +52,8 @@ impl<'a> FrameSet<FramesRef<'a>> {
}
}

#[derive(PartialEq, Eq, Clone)]
#[derive(Clone)]
#[cfg_attr(test, derive(PartialEq, Eq))]
pub(crate) struct Frame<B = Bytes> {
pub(crate) flags: Flags,
pub(crate) reliable_frame_index: Option<u24>,
Expand Down Expand Up @@ -185,12 +187,13 @@ impl<B: Buf> Frame<B> {

/// Top 3 bits are reliability type, fourth bit is 1 when the frame is fragmented and part of a
/// compound.
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Clone)]
#[cfg_attr(test, derive(PartialEq, Eq))]
pub(crate) struct Flags {
raw: u8,
pub(crate) reliability: Reliability,
pub(crate) parted: bool,
needs_bas: bool,
// needs_bas: bool,
}

impl Flags {
Expand All @@ -204,7 +207,6 @@ impl Flags {
raw,
reliability,
parted,
needs_bas: true,
}
}

Expand All @@ -225,12 +227,12 @@ impl Flags {
raw,
reliability: unsafe { std::mem::transmute::<u8, Reliability>(r) },
parted: raw & PARTED_FLAG != 0,
needs_bas: raw & NEEDS_B_AND_AS_FLAG != 0,
}
}
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[derive(Debug, Clone, Copy)]
#[cfg_attr(test, derive(PartialEq, Eq))]
pub(crate) struct Fragment {
pub(crate) parted_size: u32,
pub(crate) parted_id: u16,
Expand All @@ -253,7 +255,8 @@ impl Fragment {
}
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[derive(Debug, Clone, Copy)]
#[cfg_attr(test, derive(PartialEq, Eq))]
pub(crate) struct Ordered {
pub(crate) frame_index: u24,
pub(crate) channel: u8,
Expand All @@ -277,6 +280,7 @@ impl Ordered {
const MAX_SYSTEM_ADDRESSES_ENDPOINTS: usize = 20;

#[derive(Clone)]
#[cfg_attr(test, derive(PartialEq, Eq))]
pub(crate) enum FrameBody {
ConnectedPing {
client_timestamp: i64,
Expand Down
3 changes: 2 additions & 1 deletion src/packet/connected/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ pub(crate) use frame_set::*;
use super::{ACK_FLAG, CONTINUOUS_SEND_FLAG, NACK_FLAG, NEEDS_B_AND_AS_FLAG, VALID_FLAG};

// Packet when RakNet has established a connection
#[derive(Debug, PartialEq, Clone)]
#[derive(Debug, Clone)]
#[cfg_attr(test, derive(PartialEq, Eq))]
pub(crate) enum Packet<S> {
FrameSet(FrameSet<S>),
Ack(AckOrNack),
Expand Down
3 changes: 2 additions & 1 deletion src/packet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,8 @@ impl PackType {
}

/// Raknet packet
#[derive(Debug, PartialEq, Clone)]
#[derive(Debug, Clone)]
#[cfg_attr(test, derive(PartialEq, Eq))]
pub(crate) enum Packet<S> {
Unconnected(unconnected::Packet),
Connected(connected::Packet<S>),
Expand Down
3 changes: 2 additions & 1 deletion src/packet/unconnected.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ use crate::errors::CodecError;
use crate::packet::{read_buf, MagicRead, MagicWrite, PackType, SocketAddrRead, SocketAddrWrite};

/// Request sent before establishing a connection
#[derive(Debug, PartialEq, Clone)]
#[derive(Debug, Clone)]
#[cfg_attr(test, derive(PartialEq, Eq))]
pub(crate) enum Packet {
UnconnectedPing {
send_timestamp: i64,
Expand Down
7 changes: 3 additions & 4 deletions src/server/handler/offline.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use std::collections::HashMap;
use std::io;
use std::net::SocketAddr;
use std::num::NonZeroUsize;
Expand All @@ -14,7 +13,7 @@ use pin_project_lite::pin_project;

use crate::packet::connected::{self, FramesMut};
use crate::packet::{unconnected, Packet};
use crate::{Peer, Role};
use crate::{HashMap, Peer, Role};

#[derive(Debug, Clone)]
pub(crate) struct Config {
Expand Down Expand Up @@ -59,7 +58,7 @@ pin_project! {
// and it will be popped out during the OpenConnectionRequest2
// or when the connection is disconnected.
pending: lru::LruCache<SocketAddr, u8>,
// A `HashMap<SocketAddr, Peer>` that caches connections
// A hashmap that caches connections
// in the OpenConnectionRequest2 stage and is cleaned up on disconnection.
// The `connected` map is used to check if a `Peer` has completed the connection
// from the socket.
Expand All @@ -85,7 +84,7 @@ where
guid: config.sever_guid,
},
config,
connected: HashMap::new(),
connected: HashMap::default(),
state: OfflineState::Listening,
read_span: None,
}
Expand Down
Loading

0 comments on commit 441343b

Please sign in to comment.