Skip to content

Commit

Permalink
remove redundant B: Buf
Browse files Browse the repository at this point in the history
Signed-off-by: iGxnon <[email protected]>
  • Loading branch information
iGxnon committed Sep 24, 2024
1 parent f269e73 commit b3baadd
Show file tree
Hide file tree
Showing 7 changed files with 49 additions and 51 deletions.
18 changes: 9 additions & 9 deletions src/codec/decoder/dedup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use futures::Stream;
use pin_project_lite::pin_project;

use crate::errors::CodecError;
use crate::packet::connected::{FrameSet, Frames};
use crate::packet::connected::{FrameSet, FramesMut};
use crate::utils::{u24, BitVecQueue};

/// The deduplication window. For each connect, the maximum size is
Expand Down Expand Up @@ -66,9 +66,9 @@ pub(crate) trait Deduplicated: Sized {
fn deduplicated(self) -> Dedup<Self>;
}

impl<F, B> Deduplicated for F
impl<F> Deduplicated for F
where
F: Stream<Item = Result<FrameSet<Frames<B>>, CodecError>>,
F: Stream<Item = Result<FrameSet<FramesMut>, CodecError>>,
{
fn deduplicated(self) -> Dedup<Self> {
Dedup {
Expand All @@ -79,11 +79,11 @@ where
}
}

impl<F, B> Stream for Dedup<F>
impl<F> Stream for Dedup<F>
where
F: Stream<Item = Result<FrameSet<Frames<B>>, CodecError>>,
F: Stream<Item = Result<FrameSet<FramesMut>, CodecError>>,
{
type Item = Result<FrameSet<Frames<B>>, CodecError>;
type Item = Result<FrameSet<FramesMut>, CodecError>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
Expand Down Expand Up @@ -115,7 +115,7 @@ where
mod test {
use std::ops::Sub;

use bytes::Bytes;
use bytes::BytesMut;
use futures::StreamExt;
use futures_async_stream::stream;
use indexmap::IndexSet;
Expand Down Expand Up @@ -172,7 +172,7 @@ mod test {
assert_eq!(window.received_status.len(), 0);
}

fn frame_set(idx: impl IntoIterator<Item = u32>) -> FrameSet<Frames> {
fn frame_set(idx: impl IntoIterator<Item = u32>) -> FrameSet<FramesMut> {
FrameSet {
seq_num: 0.into(),
set: idx
Expand All @@ -183,7 +183,7 @@ mod test {
seq_frame_index: None,
ordered: None,
fragment: None,
body: Bytes::new(),
body: BytesMut::new(),
})
.collect(),
}
Expand Down
27 changes: 13 additions & 14 deletions src/codec/decoder/ordered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use std::collections::HashMap;
use std::pin::Pin;
use std::task::{ready, Context, Poll};

use bytes::Buf;
use fastrace::{Event, Span};
use futures::Stream;
use log::warn;
Expand All @@ -14,12 +13,12 @@ use crate::utils::u24;

const INITIAL_ORDERING_MAP_CAP: usize = 64;

struct Ordering<B> {
map: HashMap<u24, FrameSet<Frame<B>>>,
struct Ordering {
map: HashMap<u24, FrameSet<Frame>>,
read: u24,
}

impl<B> Default for Ordering<B> {
impl Default for Ordering {
fn default() -> Self {
Self {
map: HashMap::with_capacity(INITIAL_ORDERING_MAP_CAP),
Expand All @@ -30,25 +29,25 @@ impl<B> Default for Ordering<B> {

pin_project! {
// Ordering layer, ordered the packets based on ordering_frame_index.
pub(crate) struct Order<F, B> {
pub(crate) struct Order<F> {
#[pin]
frame: F,
// Max ordered channel that will be used in detailed protocol
max_channels: usize,
ordering: Vec<Ordering<B>>,
ordering: Vec<Ordering>,
span: Option<Span>,
}
}

pub(crate) trait Ordered<B: Buf>: Sized {
fn ordered(self, max_channels: usize) -> Order<Self, B>;
pub(crate) trait Ordered: Sized {
fn ordered(self, max_channels: usize) -> Order<Self>;
}

impl<F, B: Buf> Ordered<B> for F
impl<F> Ordered for F
where
F: Stream<Item = Result<FrameSet<Frame<B>>, CodecError>>,
F: Stream<Item = Result<FrameSet<Frame>, CodecError>>,
{
fn ordered(self, max_channels: usize) -> Order<Self, B> {
fn ordered(self, max_channels: usize) -> Order<Self> {
assert!(
max_channels < usize::from(u8::MAX),
"max channels should not be larger than u8::MAX"
Expand All @@ -66,11 +65,11 @@ where
}
}

impl<F, B> Stream for Order<F, B>
impl<F> Stream for Order<F>
where
F: Stream<Item = Result<FrameSet<Frame<B>>, CodecError>>,
F: Stream<Item = Result<FrameSet<Frame>, CodecError>>,
{
type Item = Result<FrameSet<Frame<B>>, CodecError>;
type Item = Result<FrameSet<Frame>, CodecError>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
Expand Down
8 changes: 3 additions & 5 deletions src/codec/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::net::SocketAddr;
use std::pin::Pin;
use std::task::{ready, Context, Poll};

use bytes::{Buf, BytesMut};
use bytes::BytesMut;
use fastrace::{Event, Span};
use futures::{Sink, Stream};
use log::error;
Expand Down Expand Up @@ -142,9 +142,7 @@ impl<T: AsyncSocket> Stream for Framed<T> {
}

/// The `Sink` implementation for cheap buffer cloning (i.e. `bytes::Bytes`).
impl<'a, B: Buf + Clone, T: AsyncSocket> Sink<(Packet<FramesRef<'a, B>>, SocketAddr)>
for Framed<T>
{
impl<'a, T: AsyncSocket> Sink<(Packet<FramesRef<'a>>, SocketAddr)> for Framed<T> {
type Error = io::Error;

fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Expand All @@ -153,7 +151,7 @@ impl<'a, B: Buf + Clone, T: AsyncSocket> Sink<(Packet<FramesRef<'a, B>>, SocketA

fn start_send(
self: Pin<&mut Self>,
item: (Packet<FramesRef<'a, B>>, SocketAddr),
item: (Packet<FramesRef<'a>>, SocketAddr),
) -> Result<(), Self::Error> {
let (frame, out_addr) = item;

Expand Down
35 changes: 18 additions & 17 deletions src/packet/connected/frame_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ use crate::packet::{
use crate::utils::{u24, BufExt, BufMutExt};
use crate::Reliability;

pub(crate) type Frames<B = Bytes> = Vec<Frame<B>>;
pub(crate) type Frames = Vec<Frame>;

// Cheap slice of a frames vector to reduce heap allocation
pub(crate) type FramesRef<'a, B = Bytes> = &'a [Frame<B>];
pub(crate) type FramesRef<'a> = &'a [Frame];

pub(crate) type FramesMut = Frames<BytesMut>;
pub(crate) type FramesMut = Vec<Frame<BytesMut>>;

pub(crate) type FrameMut = Frame<BytesMut>;

Expand All @@ -42,7 +42,7 @@ impl FrameSet<FramesMut> {
}
}

impl<'a, B: Buf + Clone> FrameSet<FramesRef<'a, B>> {
impl<'a> FrameSet<FramesRef<'a>> {
pub(super) fn write(self, buf: &mut BytesMut) {
buf.put_u24_le(self.seq_num);
for frame in self.set {
Expand Down Expand Up @@ -144,19 +144,8 @@ impl FrameMut {
}
}

impl<B: Buf> Frame<B> {
/// Get the total size of this frame
pub(crate) fn size(&self) -> usize {
let mut size = self.flags.reliability.size();
if self.fragment.is_some() {
size += FRAGMENT_PART_SIZE;
}
size += self.body.remaining();
size
}
}

impl<B: Buf + Clone> Frame<B> {
impl Frame {
// Write without taking ownership
fn write_ref(&self, buf: &mut BytesMut) {
self.flags.write(buf);
// length in bits
Expand All @@ -182,6 +171,18 @@ impl<B: Buf + Clone> Frame<B> {
}
}

impl<B: Buf> Frame<B> {
/// Get the total size of this frame
pub(crate) fn size(&self) -> usize {
let mut size = self.flags.reliability.size();
if self.fragment.is_some() {
size += FRAGMENT_PART_SIZE;
}
size += self.body.remaining();
size
}
}

/// Top 3 bits are reliability type, fourth bit is 1 when the frame is fragmented and part of a
/// compound.
#[derive(Debug, Clone, PartialEq)]
Expand Down
4 changes: 2 additions & 2 deletions src/packet/connected/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use bytes::{Buf, BufMut, BytesMut};
use bytes::{BufMut, BytesMut};

use crate::errors::CodecError;
use crate::packet::PackType;
Expand Down Expand Up @@ -38,7 +38,7 @@ impl<S> Packet<S> {
}

/// For cheap buffers cloning (i.e. `bytes::Bytes`)
impl<'a, B: Buf + Clone> Packet<FramesRef<'a, B>> {
impl<'a> Packet<FramesRef<'a>> {
pub(crate) fn write(self, buf: &mut BytesMut) {
match self {
Packet::FrameSet(frame) => {
Expand Down
2 changes: 1 addition & 1 deletion src/packet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ pub(crate) enum Packet<S> {
Connected(connected::Packet<S>),
}

impl<'a, B: Buf + Clone> Packet<FramesRef<'a, B>> {
impl<'a> Packet<FramesRef<'a>> {
pub(crate) fn write(self, buf: &mut BytesMut) {
match self {
Packet::Unconnected(packet) => {
Expand Down
6 changes: 3 additions & 3 deletions src/server/handler/offline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ where
mod test {
use std::collections::VecDeque;

use connected::{FrameSet, Frames};
use connected::FrameSet;
use futures::StreamExt;

use super::*;
Expand Down Expand Up @@ -360,7 +360,7 @@ mod test {
.chain(std::iter::once(Packet::Connected(
connected::Packet::FrameSet(FrameSet {
seq_num: 0.into(),
set: Frames::new(),
set: FramesMut::new(),
}),
)))
.collect(),
Expand Down Expand Up @@ -420,7 +420,7 @@ mod test {
addr: "0.0.0.2:1".parse().unwrap(),
source: vec![Packet::Connected(connected::Packet::FrameSet(FrameSet {
seq_num: 0.into(),
set: Frames::new(),
set: FramesMut::new(),
}))]
.into_iter()
.collect(),
Expand Down

0 comments on commit b3baadd

Please sign in to comment.