-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathopts.rs
215 lines (186 loc) · 5.92 KB
/
opts.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
use std::future::Future;
use std::io;
use std::net::SocketAddr;
use std::pin::Pin;
use std::task::{Context, Poll};
use fastrace::collector::TraceId;
use futures::{Sink, SinkExt};
use pin_project_lite::pin_project;
use crate::guard::SendBuffer;
use crate::link::SharedLink;
use crate::packet::connected::FrameBody;
use crate::utils::timestamp;
use crate::{Message, Peer};
/// Trace info extension for server
pub trait TraceInfo {
fn last_trace_id(&self) -> Option<TraceId>;
}
/// Obtain the connection information
pub trait ConnectionInfo {
fn mtu(&self) -> u16;
fn remote_addr(&self) -> SocketAddr;
fn guid(&self) -> u64;
}
pub(crate) trait WrapConnectionInfo: Sized {
fn wrap_connection_info(self, peer: Peer) -> ConnectionInfoWrapper<Self>;
}
pin_project! {
pub(crate) struct ConnectionInfoWrapper<I> {
#[pin]
inner: I,
peer: Peer,
}
}
impl<I> ConnectionInfo for ConnectionInfoWrapper<I> {
fn mtu(&self) -> u16 {
self.peer.mtu
}
fn remote_addr(&self) -> SocketAddr {
self.peer.addr
}
fn guid(&self) -> u64 {
self.peer.guid
}
}
impl<T, I: Sink<T>> Sink<T> for ConnectionInfoWrapper<I> {
type Error = I::Error;
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().inner.poll_ready(cx)
}
fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
self.project().inner.start_send(item)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().inner.poll_flush(cx)
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().inner.poll_close(cx)
}
}
impl<S: Sink<Message>> WrapConnectionInfo for S {
fn wrap_connection_info(self, peer: Peer) -> ConnectionInfoWrapper<Self> {
ConnectionInfoWrapper { inner: self, peer }
}
}
/// Ping extension for client, experimental
pub trait Ping {
fn ping(self: Pin<&mut Self>) -> impl Future<Output = Result<(), io::Error>> + Send;
}
impl<S> Ping for S
where
S: Sink<FrameBody, Error = io::Error> + Send,
{
async fn ping(mut self: Pin<&mut Self>) -> Result<(), io::Error> {
self.send(FrameBody::ConnectedPing {
client_timestamp: timestamp(),
})
.await
}
}
/// Flush strategy can be used as ext data of [`std::task::Context`] to guide how
/// [`Sink::poll_flush`] perform flush. And the results after flush will be stored here.
/// The default strategy will flush all buffers.
///
/// Customizing your own strategy can achieve many features:
///
/// 1. [**Delayed ack**](https://en.wikipedia.org/wiki/TCP_delayed_acknowledgment) based on timing,
/// thereby reducing the number of ack packets and improving bandwidth utilization. At the same
/// time, sending based on timing can avoid deadlocks or regressions caused by delaying based on the
/// number of packets.
///
/// 2. More aggressive nack/pack flush strategy which would be more beneficial for retransmitting
/// packets.
///
/// After the flush is completed, the strategy will store the number of frames that have been
/// flushed. You can use this number to determine when to take the next flush.
///
/// Note that it can only be used in [`Sink::poll_flush`].
#[derive(Debug, Default, Clone, Copy)]
pub struct FlushStrategy {
ack_tag: isize,
nack_tag: isize,
pack_tag: isize,
}
impl FlushStrategy {
/// Create a new flush strategy with specified flush options.
pub fn new(ack: bool, nack: bool, pack: bool) -> Self {
FlushStrategy {
ack_tag: if ack { 0 } else { -1 },
nack_tag: if nack { 0 } else { -1 },
pack_tag: if pack { 0 } else { -1 },
}
}
/// Get how many ack frames have been flushed.
///
/// # Panics
/// It will panic if ack flush is not enabled.
pub fn flushed_ack(&self) -> usize {
assert!(
self.ack_tag != -1,
"you should enable flush ack before checking result of flushed ack"
);
self.ack_tag as usize
}
/// Get how many nack frames have been flushed.
///
/// # Panics
/// It will panic if nack flush is not enabled.
pub fn flushed_nack(&self) -> usize {
assert!(
self.nack_tag != -1,
"you should enable flush nack before checking result of flushed nack"
);
self.nack_tag as usize
}
/// Get how many pack frames have been flushed.
///
/// # Panics
/// It will panic if pack flush is not enabled.
pub fn flushed_pack(&self) -> usize {
assert!(
self.pack_tag != -1,
"you should enable flush pack before checking result of flushed pack"
);
self.pack_tag as usize
}
pub(crate) fn check_flushed(&self, link: &SharedLink, buf: &SendBuffer) -> bool {
let mut ret = true;
if self.ack_tag != -1 {
ret &= link.outgoing_ack_empty();
}
if self.nack_tag != -1 {
ret &= link.outgoing_nack_empty();
}
if self.pack_tag != -1 {
ret &= link.unconnected_empty() && buf.is_empty();
}
ret
}
pub(crate) fn flush_ack(&self) -> bool {
self.ack_tag != -1
}
pub(crate) fn flush_nack(&self) -> bool {
self.nack_tag != -1
}
pub(crate) fn flush_pack(&self) -> bool {
self.pack_tag != -1
}
pub(crate) fn mark_flushed_ack(&mut self, cnt: usize) {
if self.ack_tag == -1 {
return;
}
self.ack_tag += cnt as isize;
}
pub(crate) fn mark_flushed_nack(&mut self, cnt: usize) {
if self.nack_tag == -1 {
return;
}
self.nack_tag += cnt as isize;
}
pub(crate) fn mark_flushed_pack(&mut self, cnt: usize) {
if self.pack_tag == -1 {
return;
}
self.pack_tag += cnt as isize;
}
}