Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve overall multiplex performance for rlpx satellite stream #13856

Open
mattsse opened this issue Jan 18, 2025 · 2 comments · May be fixed by #13861
Open

Improve overall multiplex performance for rlpx satellite stream #13856

mattsse opened this issue Jan 18, 2025 · 2 comments · May be fixed by #13861
Labels
A-networking Related to networking in general C-enhancement New feature or request C-perf A change motivated by improving speed, memory usage or disk footprint D-good-first-issue Nice and easy! A great choice to get started

Comments

@mattsse
Copy link
Collaborator

mattsse commented Jan 18, 2025

Describe the feature

the sink impl for

impl<St, Primary, T> Sink<T> for RlpxSatelliteStream<St, Primary>
where
St: Stream<Item = io::Result<BytesMut>> + Sink<Bytes, Error = io::Error> + Unpin,
Primary: Sink<T> + Unpin,
P2PStreamError: Into<<Primary as Sink<T>>::Error>,
{
type Error = <Primary as Sink<T>>::Error;

has a smol flaw that could potentially starve rlpx satellite impls.

because this will always bypass the satellite messages that are currently buffered

fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let this = self.get_mut();
if let Err(err) = ready!(this.inner.conn.poll_ready_unpin(cx)) {
return Poll::Ready(Err(err.into()))
}
if let Err(err) = ready!(this.primary.st.poll_ready_unpin(cx)) {
return Poll::Ready(Err(err))
}
Poll::Ready(Ok(()))
}
fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
self.get_mut().primary.st.start_send_unpin(item)
}

to fix this we should move the sink logic embedded in the stream impl into the sink impl

loop {
match this.inner.conn.poll_ready_unpin(cx) {
Poll::Ready(Ok(())) => {
if let Some(msg) = this.inner.out_buffer.pop_front() {
if let Err(err) = this.inner.conn.start_send_unpin(msg) {
return Poll::Ready(Some(Err(err.into())))
}
} else {
break
}
}

this way poll_ready should always drain the output buffer of satellite msgs first treating all messages fcfs

FYI @0xvanbeethoven @Will-Smith11

Additional context

No response

@mattsse mattsse added C-enhancement New feature or request S-needs-triage This issue needs to be labelled labels Jan 18, 2025
@mattsse mattsse added D-good-first-issue Nice and easy! A great choice to get started C-perf A change motivated by improving speed, memory usage or disk footprint A-networking Related to networking in general and removed S-needs-triage This issue needs to be labelled labels Jan 18, 2025
@mattsse
Copy link
Collaborator Author

mattsse commented Jan 18, 2025

assigning @hoank101

@chiscookeke11
Copy link

Can I take this issue?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-networking Related to networking in general C-enhancement New feature or request C-perf A change motivated by improving speed, memory usage or disk footprint D-good-first-issue Nice and easy! A great choice to get started
Projects
Status: Todo
Development

Successfully merging a pull request may close this issue.

2 participants