Skip to content

Commit

Permalink
add ConnectionInfo
Browse files Browse the repository at this point in the history
Signed-off-by: iGxnon <[email protected]>
  • Loading branch information
iGxnon committed Oct 23, 2024
1 parent 648db34 commit b7bd9ed
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## Unreleased

- Add `serde` feature to support config serializing and deserializing
- Add `ConnectionInfo`

---
## 0.1.4
Expand Down
63 changes: 63 additions & 0 deletions src/opts.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,83 @@
use std::collections::VecDeque;
use std::future::Future;
use std::io;
use std::net::SocketAddr;
use std::pin::Pin;
use std::task::{Context, Poll};

use fastrace::collector::TraceId;
use futures::{Sink, SinkExt};
use pin_project_lite::pin_project;

use crate::link::SharedLink;
use crate::packet::connected::{Frame, FrameBody};
use crate::utils::timestamp;
use crate::{Message, Peer};

/// Trace info extension for server
pub trait TraceInfo {
fn last_trace_id(&self) -> Option<TraceId>;
}

/// Obtain the connection information
pub trait ConnectionInfo {
fn mtu(&self) -> u16;
fn remote_addr(&self) -> SocketAddr;
fn guid(&self) -> u64;
}

pub(crate) trait WrapConnectionInfo: Sized {
fn wrap_connection_info(self, peer: Peer) -> ConnectionInfoWrapper<Self>;
}

pin_project! {
pub(crate) struct ConnectionInfoWrapper<I> {
#[pin]
inner: I,
peer: Peer,
}
}

impl<I> ConnectionInfo for ConnectionInfoWrapper<I> {
fn mtu(&self) -> u16 {
self.peer.mtu
}

fn remote_addr(&self) -> SocketAddr {
self.peer.addr
}

fn guid(&self) -> u64 {
self.peer.guid
}
}

impl<T, I: Sink<T>> Sink<T> for ConnectionInfoWrapper<I> {
type Error = I::Error;

fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().inner.poll_ready(cx)
}

fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
self.project().inner.start_send(item)
}

fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().inner.poll_flush(cx)
}

fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().inner.poll_close(cx)
}
}

impl<S: Sink<Message>> WrapConnectionInfo for S {
fn wrap_connection_info(self, peer: Peer) -> ConnectionInfoWrapper<Self> {
ConnectionInfoWrapper { inner: self, peer }
}
}

/// Ping extension for client, experimental
pub trait Ping {
fn ping(self: Pin<&mut Self>) -> impl Future<Output = Result<(), io::Error>> + Send;
Expand Down
4 changes: 2 additions & 2 deletions src/server/incoming/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use bytes::Bytes;
use futures::{Sink, Stream};

use super::handler::offline;
use crate::opts::TraceInfo;
use crate::opts::{ConnectionInfo, TraceInfo};
use crate::{codec, Message, Role};

/// Incoming implementation by using tokio's UDP framework
Expand Down Expand Up @@ -175,7 +175,7 @@ pub trait MakeIncoming: Sized {
) -> impl Stream<
Item = (
impl Stream<Item = Bytes> + TraceInfo,
impl Sink<Message, Error = io::Error>,
impl Sink<Message, Error = io::Error> + ConnectionInfo,
),
>;
}
9 changes: 5 additions & 4 deletions src/server/incoming/tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::codec::frame::Framed;
use crate::codec::{Decoded, Encoded};
use crate::guard::HandleOutgoing;
use crate::link::{Route, TransferLink};
use crate::opts::TraceInfo;
use crate::opts::{ConnectionInfo, TraceInfo, WrapConnectionInfo};
use crate::server::handler::offline::OfflineHandler;
use crate::server::handler::online::HandleOnline;
use crate::state::{CloseOnDrop, IncomingStateManage, OutgoingStateManage};
Expand All @@ -44,7 +44,7 @@ impl MakeIncoming for TokioUdpSocket {
) -> impl Stream<
Item = (
impl Stream<Item = Bytes> + TraceInfo,
impl Sink<Message, Error = io::Error>,
impl Sink<Message, Error = io::Error> + ConnectionInfo,
),
> {
let socket = Arc::new(self);
Expand All @@ -64,7 +64,7 @@ impl MakeIncoming for TokioUdpSocket {
impl Stream for Incoming {
type Item = (
impl Stream<Item = Bytes> + TraceInfo,
impl Sink<Message, Error = io::Error>,
impl Sink<Message, Error = io::Error> + ConnectionInfo,
);

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Expand Down Expand Up @@ -102,7 +102,8 @@ impl Stream for Incoming {
.manage_outgoing_state(Some(CloseOnDrop::new(
peer.addr,
Arc::clone(this.close_events),
)));
)))
.wrap_connection_info(peer);

let src = route
.frame_decoded(this.config.codec_config())
Expand Down

0 comments on commit b7bd9ed

Please sign in to comment.