diff --git a/Cargo.lock b/Cargo.lock index b62ee9676ac6..51e462ae7d59 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -842,9 +842,9 @@ dependencies = [ [[package]] name = "mio" -version = "0.7.6" +version = "0.7.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f33bc887064ef1fd66020c9adfc45bb9f33d75a42096c81e7c56c65b75dd1a8b" +checksum = "e50ae3f04d169fcc9bde0b547d1c205219b7157e07ded9c5aff03e0637cb3ed7" dependencies = [ "libc", "log", diff --git a/crates/shadowsocks-service/src/config.rs b/crates/shadowsocks-service/src/config.rs index af68a4f3bb50..2e89b2ab2707 100644 --- a/crates/shadowsocks-service/src/config.rs +++ b/crates/shadowsocks-service/src/config.rs @@ -236,7 +236,7 @@ cfg_if! { /// For Linux-like systems' Netfilter `REDIRECT`. Only for TCP connections. /// - /// This is supported from Linux 2.4 Kernel. Document: https://www.netfilter.org/documentation/index.html#documentation-howto + /// This is supported from Linux 2.4 Kernel. Document: /// /// NOTE: Filter rule `REDIRECT` can only be applied to TCP connections. #[cfg(any(target_os = "linux", target_os = "android"))] @@ -252,7 +252,7 @@ cfg_if! { /// /// Supported by OpenBSD 3.0+, FreeBSD 5.3+, NetBSD 3.0+, Solaris 11.3+, macOS 10.7+, iOS, QNX /// - /// Document: https://www.freebsd.org/doc/handbook/firewalls-pf.html + /// Document: #[cfg(any( target_os = "openbsd", target_os = "freebsd", diff --git a/crates/shadowsocks-service/src/local/net/mod.rs b/crates/shadowsocks-service/src/local/net/mod.rs index a9f4290df558..35cc82c0cf59 100644 --- a/crates/shadowsocks-service/src/local/net/mod.rs +++ b/crates/shadowsocks-service/src/local/net/mod.rs @@ -1,6 +1,9 @@ //! Shadowsocks Local Network Utilities -pub use self::{auto_proxy_io::AutoProxyIo, auto_proxy_stream::AutoProxyClientStream}; +pub use self::{ + tcp::{auto_proxy_io::AutoProxyIo, auto_proxy_stream::AutoProxyClientStream}, + udp::{UdpAssociationManager, UdpInboundWrite}, +}; -mod auto_proxy_io; -mod auto_proxy_stream; +mod tcp; +mod udp; diff --git a/crates/shadowsocks-service/src/local/net/auto_proxy_io.rs b/crates/shadowsocks-service/src/local/net/tcp/auto_proxy_io.rs similarity index 100% rename from crates/shadowsocks-service/src/local/net/auto_proxy_io.rs rename to crates/shadowsocks-service/src/local/net/tcp/auto_proxy_io.rs diff --git a/crates/shadowsocks-service/src/local/net/auto_proxy_stream.rs b/crates/shadowsocks-service/src/local/net/tcp/auto_proxy_stream.rs similarity index 100% rename from crates/shadowsocks-service/src/local/net/auto_proxy_stream.rs rename to crates/shadowsocks-service/src/local/net/tcp/auto_proxy_stream.rs diff --git a/crates/shadowsocks-service/src/local/net/tcp/mod.rs b/crates/shadowsocks-service/src/local/net/tcp/mod.rs new file mode 100644 index 000000000000..b2ecb9ecf821 --- /dev/null +++ b/crates/shadowsocks-service/src/local/net/tcp/mod.rs @@ -0,0 +1,2 @@ +pub mod auto_proxy_io; +pub mod auto_proxy_stream; diff --git a/crates/shadowsocks-service/src/local/net/udp/association.rs b/crates/shadowsocks-service/src/local/net/udp/association.rs new file mode 100644 index 000000000000..3b38c7ac6ca7 --- /dev/null +++ b/crates/shadowsocks-service/src/local/net/udp/association.rs @@ -0,0 +1,615 @@ +//! UDP Association Managing + +use std::{ + io::{self, ErrorKind}, + net::SocketAddr, + sync::Arc, + time::Duration, +}; + +use async_trait::async_trait; +use bytes::Bytes; +use futures::future::{self, AbortHandle}; +use log::{debug, error, trace, warn}; +use lru_time_cache::{Entry, LruCache}; +use shadowsocks::{ + lookup_then, + net::UdpSocket as ShadowUdpSocket, + relay::{ + udprelay::{ProxySocket, MAXIMUM_UDP_PAYLOAD_SIZE}, + Address, + }, +}; +use spin::Mutex as SpinMutex; +use tokio::{ + net::UdpSocket, + sync::{mpsc, Mutex}, + time, +}; + +use crate::{ + local::{context::ServiceContext, loadbalancing::PingBalancer}, + net::MonProxySocket, +}; + +/// Writer for sending packets back to client +/// +/// Currently it requires `async-trait` for `async fn` in trait, which will allocate a `Box`ed `Future` every call of `send_to`. +/// This performance issue could be solved when `generic_associated_types` and `generic_associated_types` are stablized. +#[async_trait] +pub trait UdpInboundWrite { + /// Sends packet `data` received from `remote_addr` back to `peer_addr` + async fn send_to(&self, peer_addr: SocketAddr, remote_addr: &Address, data: &[u8]) -> io::Result<()>; +} + +/// UDP association manager +pub struct UdpAssociationManager +where + W: UdpInboundWrite + Clone + Send + Sync + Unpin + 'static, +{ + respond_writer: W, + context: Arc, + assoc_map: Arc>>>, + cleanup_abortable: AbortHandle, + balancer: PingBalancer, +} + +impl Drop for UdpAssociationManager +where + W: UdpInboundWrite + Clone + Send + Sync + Unpin + 'static, +{ + fn drop(&mut self) { + self.cleanup_abortable.abort(); + } +} + +impl UdpAssociationManager +where + W: UdpInboundWrite + Clone + Send + Sync + Unpin + 'static, +{ + /// Create a new `UdpAssociationManager` + pub fn new( + context: Arc, + respond_writer: W, + time_to_live: Option, + capacity: Option, + balancer: PingBalancer, + ) -> UdpAssociationManager { + let time_to_live = time_to_live.unwrap_or(crate::DEFAULT_UDP_EXPIRY_DURATION); + let assoc_map = Arc::new(Mutex::new(match capacity { + Some(capacity) => LruCache::with_expiry_duration_and_capacity(time_to_live, capacity), + None => LruCache::with_expiry_duration(time_to_live), + })); + + let cleanup_abortable = { + let assoc_map = assoc_map.clone(); + let (cleanup_task, cleanup_abortable) = future::abortable(async move { + loop { + time::sleep(time_to_live).await; + + // iter() will trigger a cleanup of expired associations + let _ = assoc_map.lock().await.iter(); + } + }); + tokio::spawn(cleanup_task); + cleanup_abortable + }; + + UdpAssociationManager { + respond_writer, + context, + assoc_map, + cleanup_abortable, + balancer, + } + } + + /// Sends `data` from `peer_addr` to `target_addr` + pub async fn send_to(&self, peer_addr: SocketAddr, target_addr: Address, data: &[u8]) -> io::Result<()> { + // Check or (re)create an association + match self.assoc_map.lock().await.entry(peer_addr) { + Entry::Occupied(occ) => { + let assoc = occ.into_mut(); + assoc.try_send((target_addr, Bytes::copy_from_slice(data))) + } + Entry::Vacant(vac) => { + let assoc = vac.insert(UdpAssociation::new( + self.context.clone(), + peer_addr, + self.assoc_map.clone(), + self.balancer.clone(), + self.respond_writer.clone(), + )); + trace!("created udp association for {}", peer_addr); + assoc.try_send((target_addr, Bytes::copy_from_slice(data))) + } + } + } +} + +struct UdpAssociation +where + W: UdpInboundWrite + Send + Sync + Unpin + 'static, +{ + assoc: Arc>, + sender: mpsc::Sender<(Address, Bytes)>, +} + +impl Drop for UdpAssociation +where + W: UdpInboundWrite + Send + Sync + Unpin + 'static, +{ + fn drop(&mut self) { + self.assoc.bypassed_ipv4_socket.lock().abort(); + self.assoc.bypassed_ipv6_socket.lock().abort(); + self.assoc.proxied_socket.lock().abort(); + } +} + +impl UdpAssociation +where + W: UdpInboundWrite + Send + Sync + Unpin + 'static, +{ + fn new( + context: Arc, + peer_addr: SocketAddr, + assoc_map: Arc>>>, + balancer: PingBalancer, + respond_writer: W, + ) -> UdpAssociation { + let (assoc, sender) = UdpAssociationContext::new(context, peer_addr, assoc_map, balancer, respond_writer); + UdpAssociation { assoc, sender } + } + + fn try_send(&self, data: (Address, Bytes)) -> io::Result<()> { + if let Err(..) = self.sender.try_send(data) { + let err = io::Error::new(ErrorKind::Other, "udp relay channel full"); + return Err(err); + } + Ok(()) + } +} + +enum UdpAssociationBypassState { + Empty, + Connected { + socket: Arc, + abortable: AbortHandle, + }, + Aborted, +} + +impl Drop for UdpAssociationBypassState { + fn drop(&mut self) { + if let UdpAssociationBypassState::Connected { ref abortable, .. } = *self { + abortable.abort(); + } + } +} + +impl UdpAssociationBypassState { + fn empty() -> UdpAssociationBypassState { + UdpAssociationBypassState::Empty + } + + fn set_connected(&mut self, socket: Arc, abortable: AbortHandle) { + *self = UdpAssociationBypassState::Connected { socket, abortable }; + } + + fn abort(&mut self) { + *self = UdpAssociationBypassState::Aborted; + } +} + +enum UdpAssociationProxyState { + Empty, + Connected { + socket: Arc, + abortable: AbortHandle, + }, + Aborted, +} + +impl Drop for UdpAssociationProxyState { + fn drop(&mut self) { + if let UdpAssociationProxyState::Connected { ref abortable, .. } = *self { + abortable.abort(); + } + } +} + +impl UdpAssociationProxyState { + fn empty() -> UdpAssociationProxyState { + UdpAssociationProxyState::Empty + } + + fn reset(&mut self) { + *self = UdpAssociationProxyState::Empty; + } + + fn set_connected(&mut self, socket: Arc, abortable: AbortHandle) { + *self = UdpAssociationProxyState::Connected { socket, abortable }; + } + + fn abort(&mut self) { + *self = UdpAssociationProxyState::Aborted; + } +} + +struct UdpAssociationContext +where + W: UdpInboundWrite + Send + Sync + Unpin + 'static, +{ + context: Arc, + peer_addr: SocketAddr, + bypassed_ipv4_socket: SpinMutex, + bypassed_ipv6_socket: SpinMutex, + proxied_socket: SpinMutex, + assoc_map: Arc>>>, + balancer: PingBalancer, + respond_writer: W, +} + +impl Drop for UdpAssociationContext +where + W: UdpInboundWrite + Send + Sync + Unpin + 'static, +{ + fn drop(&mut self) { + trace!("udp association for {} is closed", self.peer_addr); + } +} + +impl UdpAssociationContext +where + W: UdpInboundWrite + Send + Sync + Unpin + 'static, +{ + fn new( + context: Arc, + peer_addr: SocketAddr, + assoc_map: Arc>>>, + balancer: PingBalancer, + respond_writer: W, + ) -> (Arc>, mpsc::Sender<(Address, Bytes)>) { + // Pending packets 1024 should be good enough for a server. + // If there are plenty of packets stuck in the channel, dropping exccess packets is a good way to protect the server from + // being OOM. + let (sender, receiver) = mpsc::channel(1024); + + let assoc = Arc::new(UdpAssociationContext { + context, + peer_addr, + bypassed_ipv4_socket: SpinMutex::new(UdpAssociationBypassState::empty()), + bypassed_ipv6_socket: SpinMutex::new(UdpAssociationBypassState::empty()), + proxied_socket: SpinMutex::new(UdpAssociationProxyState::empty()), + assoc_map, + balancer, + respond_writer, + }); + + let l2r_task = { + let assoc = assoc.clone(); + assoc.copy_l2r(receiver) + }; + tokio::spawn(l2r_task); + + (assoc, sender) + } + + async fn copy_l2r(self: Arc, mut receiver: mpsc::Receiver<(Address, Bytes)>) { + while let Some((target_addr, data)) = receiver.recv().await { + let bypassed = self.context.check_target_bypassed(&target_addr).await; + + trace!( + "udp relay {} -> {} ({}) with {} bytes", + self.peer_addr, + target_addr, + if bypassed { "bypassed" } else { "proxied" }, + data.len() + ); + + let assoc = self.clone(); + if bypassed { + if let Err(err) = assoc.copy_bypassed_l2r(&target_addr, &data).await { + error!( + "udp relay {} -> {} (bypassed) with {} bytes, error: {}", + self.peer_addr, + target_addr, + data.len(), + err + ); + } + } else { + if let Err(err) = assoc.copy_proxied_l2r(&target_addr, &data).await { + error!( + "udp relay {} -> {} (proxied) with {} bytes, error: {}", + self.peer_addr, + target_addr, + data.len(), + err + ); + } + } + } + } + + async fn copy_bypassed_l2r(self: Arc, target_addr: &Address, data: &[u8]) -> io::Result<()> { + match *target_addr { + Address::SocketAddress(sa) => match sa { + SocketAddr::V4(..) => self.copy_bypassed_ipv4_l2r(sa, data).await, + SocketAddr::V6(..) => self.copy_bypassed_ipv6_l2r(sa, data).await, + }, + Address::DomainNameAddress(ref dname, port) => { + lookup_then!(self.context.context_ref(), dname, port, |sa| { + match sa { + SocketAddr::V4(..) => self.clone().copy_bypassed_ipv4_l2r(sa, data).await, + SocketAddr::V6(..) => self.clone().copy_bypassed_ipv6_l2r(sa, data).await, + } + }) + .map(|_| ()) + } + } + } + + async fn copy_bypassed_ipv4_l2r(self: Arc, target_addr: SocketAddr, data: &[u8]) -> io::Result<()> { + let socket = { + let mut handle = self.bypassed_ipv4_socket.lock(); + + match *handle { + UdpAssociationBypassState::Empty => { + // Create a new connection to proxy server + + let socket = + ShadowUdpSocket::connect_any_with_opts(&target_addr, self.context.connect_opts_ref()).await?; + let socket: Arc = Arc::new(socket.into()); + + let (r2l_fut, r2l_abortable) = { + let assoc = self.clone(); + future::abortable(assoc.copy_bypassed_r2l(socket.clone())) + }; + + // CLIENT <- REMOTE + tokio::spawn(r2l_fut); + debug!( + "created udp association for {} (bypassed) with {:?}", + self.peer_addr, + self.context.connect_opts_ref() + ); + + handle.set_connected(socket.clone(), r2l_abortable); + socket + } + UdpAssociationBypassState::Connected { ref socket, .. } => socket.clone(), + UdpAssociationBypassState::Aborted => { + debug!( + "udp association for {} (bypassed) have been aborted, dropped packet {} bytes to {}", + self.peer_addr, + data.len(), + target_addr + ); + return Ok(()); + } + } + }; + + let n = socket.send_to(data, target_addr).await?; + if n != data.len() { + warn!( + "{} -> {} sent {} bytes != expected {} bytes", + self.peer_addr, + target_addr, + n, + data.len() + ); + } + + Ok(()) + } + + async fn copy_bypassed_ipv6_l2r(self: Arc, target_addr: SocketAddr, data: &[u8]) -> io::Result<()> { + let socket = { + let mut handle = self.bypassed_ipv6_socket.lock(); + + match *handle { + UdpAssociationBypassState::Empty => { + // Create a new connection to proxy server + + let socket = + ShadowUdpSocket::connect_any_with_opts(&target_addr, self.context.connect_opts_ref()).await?; + let socket: Arc = Arc::new(socket.into()); + + let (r2l_fut, r2l_abortable) = { + let assoc = self.clone(); + future::abortable(assoc.copy_bypassed_r2l(socket.clone())) + }; + + // CLIENT <- REMOTE + tokio::spawn(r2l_fut); + debug!( + "created udp association for {} (bypassed) with {:?}", + self.peer_addr, + self.context.connect_opts_ref() + ); + + handle.set_connected(socket.clone(), r2l_abortable); + socket + } + UdpAssociationBypassState::Connected { ref socket, .. } => socket.clone(), + UdpAssociationBypassState::Aborted => { + debug!( + "udp association for {} (bypassed) have been aborted, dropped packet {} bytes to {}", + self.peer_addr, + data.len(), + target_addr + ); + return Ok(()); + } + } + }; + + let n = socket.send_to(data, target_addr).await?; + if n != data.len() { + warn!( + "{} -> {} sent {} bytes != expected {} bytes", + self.peer_addr, + target_addr, + n, + data.len() + ); + } + + Ok(()) + } + + async fn copy_proxied_l2r(self: Arc, target_addr: &Address, data: &[u8]) -> io::Result<()> { + let mut last_err = io::Error::new(ErrorKind::Other, "udp relay sendto failed after retry"); + + for tried in 0..3 { + let socket = { + let mut handle = self.proxied_socket.lock(); + + match *handle { + UdpAssociationProxyState::Empty => { + // Create a new connection to proxy server + + let server = self.balancer.best_udp_server(); + let svr_cfg = server.server_config(); + + let socket = ProxySocket::connect_with_opts( + self.context.context(), + svr_cfg, + self.context.connect_opts_ref(), + ) + .await?; + let socket = MonProxySocket::from_socket(socket, self.context.flow_stat()); + let socket = Arc::new(socket); + + let (r2l_fut, r2l_abortable) = { + let assoc = self.clone(); + future::abortable(assoc.copy_proxied_r2l(socket.clone())) + }; + + // CLIENT <- REMOTE + tokio::spawn(r2l_fut); + + debug!( + "created udp association for {} <-> {} (proxied) with {:?}", + self.peer_addr, + svr_cfg.addr(), + self.context.connect_opts_ref() + ); + + handle.set_connected(socket.clone(), r2l_abortable); + socket + } + UdpAssociationProxyState::Connected { ref socket, .. } => socket.clone(), + UdpAssociationProxyState::Aborted => { + debug!( + "udp association for {} (proxied) have been aborted, dropped packet {} bytes to {}", + self.peer_addr, + data.len(), + target_addr + ); + return Ok(()); + } + } + }; + + match socket.send(target_addr, data).await { + Ok(..) => return Ok(()), + Err(err) => { + debug!( + "{} -> {} (proxied) sending {} bytes failed, tried: {}, error: {}", + self.peer_addr, + target_addr, + data.len(), + tried + 1, + err + ); + last_err = err; + + // Reset for reconnecting + self.proxied_socket.lock().reset(); + + tokio::task::yield_now().await; + } + } + } + + Err(last_err) + } + + async fn copy_proxied_r2l(self: Arc, outbound: Arc) -> io::Result<()> { + let mut buffer = [0u8; MAXIMUM_UDP_PAYLOAD_SIZE]; + loop { + let (n, addr) = match outbound.recv(&mut buffer).await { + Ok(n) => { + // Keep association alive in map + let _ = self.assoc_map.lock().await.get(&self.peer_addr); + n + } + Err(err) => { + // Socket that connected to remote server returns an error, it should be ECONNREFUSED in most cases. + // That indicates that the association on the server side have been dropped. + // + // There is no point to keep this socket. Drop it immediately. + self.proxied_socket.lock().reset(); + + error!( + "udp failed to receive from proxied outbound socket, peer_addr: {}, error: {}", + self.peer_addr, err + ); + time::sleep(Duration::from_secs(1)).await; + continue; + } + }; + + let data = &buffer[..n]; + + // Send back to client + if let Err(err) = self.respond_writer.send_to(self.peer_addr, &addr, data).await { + warn!( + "udp failed to send back to client {}, from target {}, error: {}", + self.peer_addr, addr, err + ); + continue; + } + + trace!("udp relay {} <- {} with {} bytes", self.peer_addr, addr, data.len()); + } + } + + async fn copy_bypassed_r2l(self: Arc, outbound: Arc) -> io::Result<()> { + let mut buffer = [0u8; MAXIMUM_UDP_PAYLOAD_SIZE]; + loop { + let (n, addr) = match outbound.recv_from(&mut buffer).await { + Ok(n) => { + // Keep association alive in map + let _ = self.assoc_map.lock().await.get(&self.peer_addr); + n + } + Err(err) => { + error!( + "udp failed to receive from bypass outbound socket, peer_addr: {}, error: {}", + self.peer_addr, err + ); + time::sleep(Duration::from_secs(1)).await; + continue; + } + }; + + let data = &buffer[..n]; + let addr = Address::from(addr); + + // Send back to client + if let Err(err) = self.respond_writer.send_to(self.peer_addr, &addr, data).await { + warn!( + "udp failed to send back to client {}, from target {}, error: {}", + self.peer_addr, addr, err + ); + continue; + } + + trace!("udp relay {} <- {} with {} bytes", self.peer_addr, addr, data.len()); + } + } +} diff --git a/crates/shadowsocks-service/src/local/net/udp/mod.rs b/crates/shadowsocks-service/src/local/net/udp/mod.rs new file mode 100644 index 000000000000..1cd5d91ecd02 --- /dev/null +++ b/crates/shadowsocks-service/src/local/net/udp/mod.rs @@ -0,0 +1,3 @@ +pub use self::association::{UdpAssociationManager, UdpInboundWrite}; + +pub mod association; diff --git a/crates/shadowsocks-service/src/local/redir/redir_ext.rs b/crates/shadowsocks-service/src/local/redir/redir_ext.rs index b083444e6661..2478f742eb5d 100644 --- a/crates/shadowsocks-service/src/local/redir/redir_ext.rs +++ b/crates/shadowsocks-service/src/local/redir/redir_ext.rs @@ -13,6 +13,7 @@ use tokio::net::TcpListener; use crate::config::RedirType; +/// Extension function for `TcpListener` for setting extra options before `bind()` #[async_trait] pub trait TcpListenerRedirExt { // Create a TcpListener for transparent proxy @@ -21,6 +22,7 @@ pub trait TcpListenerRedirExt { async fn bind_redir(ty: RedirType, addr: SocketAddr) -> io::Result; } +/// Extension function for `TcpStream` for reading original destination address pub trait TcpStreamRedirExt { // Read destination address for TcpStream // @@ -28,6 +30,7 @@ pub trait TcpStreamRedirExt { fn destination_addr(&self, ty: RedirType) -> io::Result; } +/// `UdpSocket` that support transparent proxy pub trait UdpSocketRedir { /// Receive a single datagram from the socket. /// @@ -41,6 +44,7 @@ pub trait UdpSocketRedir { ) -> Poll>; } +/// Extension functions for `UdpSocket` to receive data with original destination address pub trait UdpSocketRedirExt { fn recv_dest_from<'a>(&'a self, buf: &'a mut [u8]) -> RecvDestFrom<'a, Self> where @@ -52,6 +56,7 @@ pub trait UdpSocketRedirExt { impl UdpSocketRedirExt for S where S: UdpSocketRedir {} +/// Future for `recv_dest_from` pub struct RecvDestFrom<'a, S: 'a> { socket: &'a S, buf: &'a mut [u8], diff --git a/crates/shadowsocks-service/src/local/redir/server.rs b/crates/shadowsocks-service/src/local/redir/server.rs index d5ad8c8b6d63..023d66ef52a6 100644 --- a/crates/shadowsocks-service/src/local/redir/server.rs +++ b/crates/shadowsocks-service/src/local/redir/server.rs @@ -100,7 +100,7 @@ impl Redir { } async fn run_udp_tunnel(&self, client_config: &ClientConfig, balancer: PingBalancer) -> io::Result<()> { - let mut server = UdpRedir::new( + let server = UdpRedir::new( self.context.clone(), self.udp_redir, self.udp_expiry_duration, diff --git a/crates/shadowsocks-service/src/local/redir/udprelay/mod.rs b/crates/shadowsocks-service/src/local/redir/udprelay/mod.rs index 79c0d4749aa2..7de047146d6b 100644 --- a/crates/shadowsocks-service/src/local/redir/udprelay/mod.rs +++ b/crates/shadowsocks-service/src/local/redir/udprelay/mod.rs @@ -7,48 +7,62 @@ use std::{ time::Duration, }; -use bytes::Bytes; -use futures::future::{self, AbortHandle}; -use log::{debug, error, info, trace, warn}; -use lru_time_cache::{Entry, LruCache}; +use async_trait::async_trait; +use log::{error, info, trace}; use shadowsocks::{ lookup_then, - net::UdpSocket as ShadowUdpSocket, - relay::{ - socks5::Address, - udprelay::{ProxySocket, MAXIMUM_UDP_PAYLOAD_SIZE}, - }, -}; -use spin::Mutex as SpinMutex; -use tokio::{ - net::UdpSocket, - sync::{mpsc, Mutex}, - time, + relay::{socks5::Address, udprelay::MAXIMUM_UDP_PAYLOAD_SIZE}, }; use crate::{ config::{ClientConfig, RedirType}, - local::{context::ServiceContext, loadbalancing::PingBalancer, redir::redir_ext::UdpSocketRedirExt}, - net::MonProxySocket, + local::{ + context::ServiceContext, + loadbalancing::PingBalancer, + net::{UdpAssociationManager, UdpInboundWrite}, + redir::redir_ext::UdpSocketRedirExt, + }, }; use self::sys::UdpRedirSocket; mod sys; -pub struct UdpRedir { - context: Arc, +#[derive(Clone)] +struct UdpRedirInboundWriter { redir_ty: RedirType, - assoc_map: Arc>>, - cleanup_abortable: AbortHandle, } -impl Drop for UdpRedir { - fn drop(&mut self) { - self.cleanup_abortable.abort(); +#[async_trait] +impl UdpInboundWrite for UdpRedirInboundWriter { + async fn send_to(&self, peer_addr: SocketAddr, remote_addr: &Address, data: &[u8]) -> io::Result<()> { + let addr = match *remote_addr { + Address::SocketAddress(sa) => sa, + Address::DomainNameAddress(..) => { + let err = io::Error::new( + ErrorKind::InvalidInput, + "redir destination must not be an domain name address", + ); + return Err(err); + } + }; + + // Create a socket binds to destination addr + // This only works for systems that supports binding to non-local addresses + let inbound = UdpRedirSocket::bind(self.redir_ty, addr)?; + + // Send back to client + inbound.send_to(data, peer_addr).await.map(|_| ()) } } +pub struct UdpRedir { + context: Arc, + redir_ty: RedirType, + time_to_live: Option, + capacity: Option, +} + impl UdpRedir { pub fn new( context: Arc, @@ -56,35 +70,15 @@ impl UdpRedir { time_to_live: Option, capacity: Option, ) -> UdpRedir { - let time_to_live = time_to_live.unwrap_or(crate::DEFAULT_UDP_EXPIRY_DURATION); - let assoc_map = Arc::new(Mutex::new(match capacity { - Some(capacity) => LruCache::with_expiry_duration_and_capacity(time_to_live, capacity), - None => LruCache::with_expiry_duration(time_to_live), - })); - - let cleanup_abortable = { - let assoc_map = assoc_map.clone(); - let (cleanup_task, cleanup_abortable) = future::abortable(async move { - loop { - time::sleep(time_to_live).await; - - // iter() will trigger a cleanup of expired associations - let _ = assoc_map.lock().await.iter(); - } - }); - tokio::spawn(cleanup_task); - cleanup_abortable - }; - UdpRedir { context, redir_ty, - assoc_map, - cleanup_abortable, + time_to_live, + capacity, } } - pub async fn run(&mut self, client_config: &ClientConfig, balancer: PingBalancer) -> io::Result<()> { + pub async fn run(&self, client_config: &ClientConfig, balancer: PingBalancer) -> io::Result<()> { let listener = match *client_config { ClientConfig::SocketAddr(ref saddr) => UdpRedirSocket::bind(self.redir_ty, *saddr)?, ClientConfig::DomainName(ref dname, port) => { @@ -98,6 +92,16 @@ impl UdpRedir { let local_addr = listener.local_addr().expect("determine port bound to"); info!("shadowsocks UDP redirect listening on {}", local_addr); + let manager = UdpAssociationManager::new( + self.context.clone(), + UdpRedirInboundWriter { + redir_ty: self.redir_ty, + }, + self.time_to_live, + self.capacity, + balancer, + ); + let mut pkt_buf = [0u8; MAXIMUM_UDP_PAYLOAD_SIZE]; loop { let (recv_len, src, dst) = match listener.recv_dest_from(&mut pkt_buf).await { @@ -130,7 +134,7 @@ impl UdpRedir { continue; } - if let Err(err) = self.send_packet(src, dst, &balancer, pkt).await { + if let Err(err) = manager.send_to(src, Address::from(dst), pkt).await { error!( "udp packet relay {} -> {} with {} bytes failed, error: {}", src, @@ -141,519 +145,4 @@ impl UdpRedir { } } } - - async fn send_packet( - &mut self, - peer_addr: SocketAddr, - target_addr: SocketAddr, - balancer: &PingBalancer, - data: &[u8], - ) -> io::Result<()> { - // Check or (re)create an association - match self.assoc_map.lock().await.entry(peer_addr) { - Entry::Occupied(occ) => { - let assoc = occ.into_mut(); - assoc.try_send((target_addr, Bytes::copy_from_slice(data))) - } - Entry::Vacant(vac) => { - let assoc = vac.insert(UdpAssociation::new( - self.context.clone(), - self.redir_ty, - peer_addr, - self.assoc_map.clone(), - balancer.clone(), - )); - trace!("created udp association for {}", peer_addr); - assoc.try_send((target_addr, Bytes::copy_from_slice(data))) - } - } - } -} - -struct UdpAssociation { - assoc: Arc, - sender: mpsc::Sender<(SocketAddr, Bytes)>, -} - -impl Drop for UdpAssociation { - fn drop(&mut self) { - self.assoc.bypassed_ipv4_socket.lock().abort(); - self.assoc.bypassed_ipv6_socket.lock().abort(); - self.assoc.proxied_socket.lock().abort(); - } -} - -impl UdpAssociation { - fn new( - context: Arc, - redir_ty: RedirType, - peer_addr: SocketAddr, - assoc_map: Arc>>, - balancer: PingBalancer, - ) -> UdpAssociation { - let (assoc, sender) = UdpAssociationContext::new(context, redir_ty, peer_addr, assoc_map, balancer); - UdpAssociation { assoc, sender } - } - - fn try_send(&self, data: (SocketAddr, Bytes)) -> io::Result<()> { - if let Err(..) = self.sender.try_send(data) { - let err = io::Error::new(ErrorKind::Other, "udp relay channel full"); - return Err(err); - } - Ok(()) - } -} - -enum UdpAssociationBypassState { - Empty, - Connected { - socket: Arc, - abortable: AbortHandle, - }, - Aborted, -} - -impl Drop for UdpAssociationBypassState { - fn drop(&mut self) { - if let UdpAssociationBypassState::Connected { ref abortable, .. } = *self { - abortable.abort(); - } - } -} - -impl UdpAssociationBypassState { - fn empty() -> UdpAssociationBypassState { - UdpAssociationBypassState::Empty - } - - fn set_connected(&mut self, socket: Arc, abortable: AbortHandle) { - *self = UdpAssociationBypassState::Connected { socket, abortable }; - } - - fn abort(&mut self) { - *self = UdpAssociationBypassState::Aborted; - } -} - -enum UdpAssociationProxyState { - Empty, - Connected { - socket: Arc, - abortable: AbortHandle, - }, - Aborted, -} - -impl Drop for UdpAssociationProxyState { - fn drop(&mut self) { - if let UdpAssociationProxyState::Connected { ref abortable, .. } = *self { - abortable.abort(); - } - } -} - -impl UdpAssociationProxyState { - fn empty() -> UdpAssociationProxyState { - UdpAssociationProxyState::Empty - } - - fn reset(&mut self) { - *self = UdpAssociationProxyState::Empty; - } - - fn set_connected(&mut self, socket: Arc, abortable: AbortHandle) { - *self = UdpAssociationProxyState::Connected { socket, abortable }; - } - - fn abort(&mut self) { - *self = UdpAssociationProxyState::Aborted; - } -} - -struct UdpAssociationContext { - context: Arc, - redir_ty: RedirType, - peer_addr: SocketAddr, - bypassed_ipv4_socket: SpinMutex, - bypassed_ipv6_socket: SpinMutex, - proxied_socket: SpinMutex, - assoc_map: Arc>>, - balancer: PingBalancer, -} - -impl Drop for UdpAssociationContext { - fn drop(&mut self) { - trace!("udp association for {} is closed", self.peer_addr); - } -} - -impl UdpAssociationContext { - fn new( - context: Arc, - redir_ty: RedirType, - peer_addr: SocketAddr, - assoc_map: Arc>>, - balancer: PingBalancer, - ) -> (Arc, mpsc::Sender<(SocketAddr, Bytes)>) { - // Pending packets 1024 should be good enough for a server. - // If there are plenty of packets stuck in the channel, dropping exccess packets is a good way to protect the server from - // being OOM. - let (sender, receiver) = mpsc::channel(1024); - - let assoc = Arc::new(UdpAssociationContext { - context, - redir_ty, - peer_addr, - bypassed_ipv4_socket: SpinMutex::new(UdpAssociationBypassState::empty()), - bypassed_ipv6_socket: SpinMutex::new(UdpAssociationBypassState::empty()), - proxied_socket: SpinMutex::new(UdpAssociationProxyState::empty()), - assoc_map, - balancer, - }); - - let l2r_task = { - let assoc = assoc.clone(); - assoc.copy_l2r(receiver) - }; - tokio::spawn(l2r_task); - - (assoc, sender) - } - - async fn copy_l2r(self: Arc, mut receiver: mpsc::Receiver<(SocketAddr, Bytes)>) { - while let Some((target_addr, data)) = receiver.recv().await { - let bypassed = self.context.check_target_bypassed(&Address::from(target_addr)).await; - - trace!( - "udp relay {} -> {} ({}) with {} bytes", - self.peer_addr, - target_addr, - if bypassed { "bypassed" } else { "proxied" }, - data.len() - ); - - let assoc = self.clone(); - if bypassed { - if let Err(err) = assoc.copy_bypassed_l2r(target_addr, &data).await { - error!( - "udp relay {} -> {} (bypassed) with {} bytes, error: {}", - self.peer_addr, - target_addr, - data.len(), - err - ); - } - } else { - if let Err(err) = assoc.copy_proxied_l2r(target_addr, &data).await { - error!( - "udp relay {} -> {} (proxied) with {} bytes, error: {}", - self.peer_addr, - target_addr, - data.len(), - err - ); - } - } - } - } - - async fn copy_bypassed_l2r(self: Arc, target_addr: SocketAddr, data: &[u8]) -> io::Result<()> { - match target_addr { - SocketAddr::V4(..) => self.copy_bypassed_ipv4_l2r(target_addr, data).await, - SocketAddr::V6(..) => self.copy_bypassed_ipv6_l2r(target_addr, data).await, - } - } - - async fn copy_bypassed_ipv4_l2r(self: Arc, target_addr: SocketAddr, data: &[u8]) -> io::Result<()> { - let socket = { - let mut handle = self.bypassed_ipv4_socket.lock(); - - match *handle { - UdpAssociationBypassState::Empty => { - // Create a new connection to proxy server - - let socket = - ShadowUdpSocket::connect_any_with_opts(&target_addr, self.context.connect_opts_ref()).await?; - let socket: Arc = Arc::new(socket.into()); - - let (r2l_fut, r2l_abortable) = { - let assoc = self.clone(); - future::abortable(assoc.copy_bypassed_r2l(socket.clone())) - }; - - // CLIENT <- REMOTE - tokio::spawn(r2l_fut); - debug!( - "created udp association for {} (bypassed) with {:?}", - self.peer_addr, - self.context.connect_opts_ref() - ); - - handle.set_connected(socket.clone(), r2l_abortable); - socket - } - UdpAssociationBypassState::Connected { ref socket, .. } => socket.clone(), - UdpAssociationBypassState::Aborted => { - debug!( - "udp association for {} (bypassed) have been aborted, dropped packet {} bytes to {}", - self.peer_addr, - data.len(), - target_addr - ); - return Ok(()); - } - } - }; - - let n = socket.send_to(data, target_addr).await?; - if n != data.len() { - warn!( - "{} -> {} sent {} bytes != expected {} bytes", - self.peer_addr, - target_addr, - n, - data.len() - ); - } - - Ok(()) - } - - async fn copy_bypassed_ipv6_l2r(self: Arc, target_addr: SocketAddr, data: &[u8]) -> io::Result<()> { - let socket = { - let mut handle = self.bypassed_ipv6_socket.lock(); - - match *handle { - UdpAssociationBypassState::Empty => { - // Create a new connection to proxy server - - let socket = - ShadowUdpSocket::connect_any_with_opts(&target_addr, self.context.connect_opts_ref()).await?; - let socket: Arc = Arc::new(socket.into()); - - let (r2l_fut, r2l_abortable) = { - let assoc = self.clone(); - future::abortable(assoc.copy_bypassed_r2l(socket.clone())) - }; - - // CLIENT <- REMOTE - tokio::spawn(r2l_fut); - debug!( - "created udp association for {} (bypassed) with {:?}", - self.peer_addr, - self.context.connect_opts_ref() - ); - - handle.set_connected(socket.clone(), r2l_abortable); - socket - } - UdpAssociationBypassState::Connected { ref socket, .. } => socket.clone(), - UdpAssociationBypassState::Aborted => { - debug!( - "udp association for {} (bypassed) have been aborted, dropped packet {} bytes to {}", - self.peer_addr, - data.len(), - target_addr - ); - return Ok(()); - } - } - }; - - let n = socket.send_to(data, target_addr).await?; - if n != data.len() { - warn!( - "{} -> {} sent {} bytes != expected {} bytes", - self.peer_addr, - target_addr, - n, - data.len() - ); - } - - Ok(()) - } - - async fn copy_proxied_l2r(self: Arc, target_addr: SocketAddr, data: &[u8]) -> io::Result<()> { - let mut last_err = io::Error::new(ErrorKind::Other, "udp relay sendto failed after retry"); - - for tried in 0..3 { - let socket = { - let mut handle = self.proxied_socket.lock(); - - match *handle { - UdpAssociationProxyState::Empty => { - // Create a new connection to proxy server - - let server = self.balancer.best_udp_server(); - let svr_cfg = server.server_config(); - - let socket = ProxySocket::connect_with_opts( - self.context.context(), - svr_cfg, - self.context.connect_opts_ref(), - ) - .await?; - let socket = MonProxySocket::from_socket(socket, self.context.flow_stat()); - let socket = Arc::new(socket); - - let (r2l_fut, r2l_abortable) = { - let assoc = self.clone(); - future::abortable(assoc.copy_proxied_r2l(socket.clone())) - }; - - // CLIENT <- REMOTE - tokio::spawn(r2l_fut); - - debug!( - "created udp association for {} <-> {} (proxied) with {:?}", - self.peer_addr, - svr_cfg.addr(), - self.context.connect_opts_ref() - ); - - handle.set_connected(socket.clone(), r2l_abortable); - socket - } - UdpAssociationProxyState::Connected { ref socket, .. } => socket.clone(), - UdpAssociationProxyState::Aborted => { - debug!( - "udp association for {} (proxied) have been aborted, dropped packet {} bytes to {}", - self.peer_addr, - data.len(), - target_addr - ); - return Ok(()); - } - } - }; - - let target_addr = Address::from(target_addr); - match socket.send(&target_addr, data).await { - Ok(..) => return Ok(()), - Err(err) => { - debug!( - "{} -> {} (proxied) sending {} bytes failed, tried: {}, error: {}", - self.peer_addr, - target_addr, - data.len(), - tried + 1, - err - ); - last_err = err; - - // Reset for reconnecting - self.proxied_socket.lock().reset(); - - tokio::task::yield_now().await; - } - } - } - - Err(last_err) - } - - async fn copy_proxied_r2l(self: Arc, outbound: Arc) -> io::Result<()> { - let mut buffer = [0u8; MAXIMUM_UDP_PAYLOAD_SIZE]; - loop { - let (n, addr) = match outbound.recv(&mut buffer).await { - Ok(n) => { - // Keep association alive in map - let _ = self.assoc_map.lock().await.get(&self.peer_addr); - n - } - Err(err) => { - // Socket that connected to remote server returns an error, it should be ECONNREFUSED in most cases. - // That indicates that the association on the server side have been dropped. - // - // There is no point to keep this socket. Drop it immediately. - self.proxied_socket.lock().reset(); - - error!( - "udp failed to receive from proxied outbound socket, peer_addr: {}, error: {}", - self.peer_addr, err - ); - time::sleep(Duration::from_secs(1)).await; - continue; - } - }; - - let data = &buffer[..n]; - - // Create a transparent socket binds to that `addr` and send back to clients - // - // XXX: addr must not be a domain name address - let addr = match addr { - Address::SocketAddress(sa) => sa, - Address::DomainNameAddress(..) => { - error!( - "received proxied packet {} <- {}, redir doesn't allow binding to a domain name address", - self.peer_addr, addr - ); - continue; - } - }; - - // Create a socket binds to destination addr - // This only works for systems that supports binding to non-local addresses - let inbound = match UdpRedirSocket::bind(self.redir_ty, addr) { - Ok(s) => s, - Err(err) => { - error!( - "failed to bind to dest {} for sending back to {}, error: {}", - addr, self.peer_addr, err - ); - continue; - } - }; - - // Send back to client - if let Err(err) = inbound.send_to(data, self.peer_addr).await { - warn!( - "udp failed to send back to client {}, from target {}, error: {}", - self.peer_addr, addr, err - ); - continue; - } - - trace!("udp relay {} <- {} with {} bytes", self.peer_addr, addr, data.len()); - } - } - - async fn copy_bypassed_r2l(self: Arc, outbound: Arc) -> io::Result<()> { - let mut buffer = [0u8; MAXIMUM_UDP_PAYLOAD_SIZE]; - loop { - let (n, addr) = match outbound.recv_from(&mut buffer).await { - Ok(n) => { - // Keep association alive in map - let _ = self.assoc_map.lock().await.get(&self.peer_addr); - n - } - Err(err) => { - error!( - "udp failed to receive from bypass outbound socket, peer_addr: {}, error: {}", - self.peer_addr, err - ); - time::sleep(Duration::from_secs(1)).await; - continue; - } - }; - - let data = &buffer[..n]; - - // Create a socket binds to destination addr - // This only works for systems that supports binding to non-local addresses - let inbound = UdpRedirSocket::bind(self.redir_ty, addr)?; - - // Send back to client - if let Err(err) = inbound.send_to(data, self.peer_addr).await { - warn!( - "udp failed to send back to client {}, from target {}, error: {}", - self.peer_addr, addr, err - ); - } - - trace!("udp relay {} <- {} with {} bytes", self.peer_addr, addr, data.len()); - } - } } diff --git a/crates/shadowsocks-service/src/local/socks/server/mod.rs b/crates/shadowsocks-service/src/local/socks/server/mod.rs index e410d4d057a0..2736359a1227 100644 --- a/crates/shadowsocks-service/src/local/socks/server/mod.rs +++ b/crates/shadowsocks-service/src/local/socks/server/mod.rs @@ -209,7 +209,7 @@ impl Socks { } async fn run_udp_server(&self, client_config: &ClientConfig, balancer: PingBalancer) -> io::Result<()> { - let mut server = Socks5UdpServer::new(self.context.clone(), self.udp_expiry_duration, self.udp_capacity); + let server = Socks5UdpServer::new(self.context.clone(), self.udp_expiry_duration, self.udp_capacity); let udp_bind_addr = self.udp_bind_addr.as_ref().unwrap_or(client_config); server.run(udp_bind_addr, balancer).await diff --git a/crates/shadowsocks-service/src/local/socks/server/socks5/udprelay.rs b/crates/shadowsocks-service/src/local/socks/server/socks5/udprelay.rs index fb800c083627..e0926d680550 100644 --- a/crates/shadowsocks-service/src/local/socks/server/socks5/udprelay.rs +++ b/crates/shadowsocks-service/src/local/socks/server/socks5/udprelay.rs @@ -7,79 +7,69 @@ use std::{ time::Duration, }; +use async_trait::async_trait; use byte_string::ByteStr; -use bytes::{BufMut, Bytes, BytesMut}; -use futures::future::{self, AbortHandle}; -use io::ErrorKind; -use log::{debug, error, info, trace, warn}; -use lru_time_cache::{Entry, LruCache}; +use bytes::{BufMut, BytesMut}; +use log::{error, info, trace}; use shadowsocks::{ lookup_then, net::UdpSocket as ShadowUdpSocket, relay::{ socks5::{Address, UdpAssociateHeader}, - udprelay::{ProxySocket, MAXIMUM_UDP_PAYLOAD_SIZE}, + udprelay::MAXIMUM_UDP_PAYLOAD_SIZE, }, }; -use spin::Mutex as SpinMutex; -use tokio::{ - net::UdpSocket, - sync::{mpsc, Mutex}, - time, -}; +use tokio::{net::UdpSocket, time}; use crate::{ config::ClientConfig, - local::{context::ServiceContext, loadbalancing::PingBalancer}, - net::MonProxySocket, + local::{ + context::ServiceContext, + loadbalancing::PingBalancer, + net::{UdpAssociationManager, UdpInboundWrite}, + }, }; -pub struct Socks5UdpServer { - context: Arc, - assoc_map: Arc>>, - cleanup_abortable: AbortHandle, +#[derive(Clone)] +struct Socks5UdpInboundWriter { + inbound: Arc, } -impl Drop for Socks5UdpServer { - fn drop(&mut self) { - self.cleanup_abortable.abort(); +#[async_trait] +impl UdpInboundWrite for Socks5UdpInboundWriter { + async fn send_to(&self, peer_addr: SocketAddr, remote_addr: &Address, data: &[u8]) -> io::Result<()> { + // Resssemble packet + let mut payload_buffer = BytesMut::new(); + let header = UdpAssociateHeader::new(0, remote_addr.clone()); + payload_buffer.reserve(header.serialized_len() + data.len()); + + header.write_to_buf(&mut payload_buffer); + payload_buffer.put_slice(data); + + self.inbound.send_to(&payload_buffer, peer_addr).await.map(|_| ()) } } +pub struct Socks5UdpServer { + context: Arc, + time_to_live: Option, + capacity: Option, +} + impl Socks5UdpServer { pub fn new( context: Arc, time_to_live: Option, capacity: Option, ) -> Socks5UdpServer { - let time_to_live = time_to_live.unwrap_or(crate::DEFAULT_UDP_EXPIRY_DURATION); - let assoc_map = Arc::new(Mutex::new(match capacity { - Some(capacity) => LruCache::with_expiry_duration_and_capacity(time_to_live, capacity), - None => LruCache::with_expiry_duration(time_to_live), - })); - - let cleanup_abortable = { - let assoc_map = assoc_map.clone(); - let (cleanup_task, cleanup_abortable) = future::abortable(async move { - loop { - time::sleep(time_to_live).await; - - // iter() will trigger a cleanup of expired associations - let _ = assoc_map.lock().await.iter(); - } - }); - tokio::spawn(cleanup_task); - cleanup_abortable - }; - Socks5UdpServer { context, - assoc_map, - cleanup_abortable, + time_to_live, + capacity, } } - pub async fn run(&mut self, client_config: &ClientConfig, balancer: PingBalancer) -> io::Result<()> { + pub async fn run(&self, client_config: &ClientConfig, balancer: PingBalancer) -> io::Result<()> { let socket = match *client_config { ClientConfig::SocketAddr(ref saddr) => ShadowUdpSocket::bind(&saddr).await?, ClientConfig::DomainName(ref dname, port) => { @@ -94,6 +84,15 @@ impl Socks5UdpServer { info!("shadowsocks socks5 UDP listening on {}", socket.local_addr()?); let listener = Arc::new(socket); + let manager = UdpAssociationManager::new( + self.context.clone(), + Socks5UdpInboundWriter { + inbound: listener.clone(), + }, + self.time_to_live, + self.capacity, + balancer, + ); let mut buffer = [0u8; MAXIMUM_UDP_PAYLOAD_SIZE]; loop { @@ -133,10 +132,7 @@ impl Socks5UdpServer { payload.len() ); - if let Err(err) = self - .send_packet(&listener, peer_addr, header.address, &balancer, payload) - .await - { + if let Err(err) = manager.send_to(peer_addr, header.address, payload).await { error!( "udp packet from {} relay {} bytes failed, error: {}", peer_addr, @@ -146,511 +142,4 @@ impl Socks5UdpServer { } } } - - async fn send_packet( - &mut self, - listener: &Arc, - peer_addr: SocketAddr, - target_addr: Address, - balancer: &PingBalancer, - data: &[u8], - ) -> io::Result<()> { - match self.assoc_map.lock().await.entry(peer_addr) { - Entry::Occupied(occ) => { - let assoc = occ.into_mut(); - assoc.try_send((target_addr, Bytes::copy_from_slice(data))) - } - Entry::Vacant(vac) => { - let assoc = vac.insert(UdpAssociation::new( - self.context.clone(), - listener.clone(), - peer_addr, - self.assoc_map.clone(), - balancer.clone(), - )); - trace!("created udp association for {}", peer_addr); - assoc.try_send((target_addr, Bytes::copy_from_slice(data))) - } - } - } -} - -struct UdpAssociation { - assoc: Arc, - sender: mpsc::Sender<(Address, Bytes)>, -} - -impl Drop for UdpAssociation { - fn drop(&mut self) { - self.assoc.bypassed_ipv4_socket.lock().abort(); - self.assoc.bypassed_ipv6_socket.lock().abort(); - self.assoc.proxied_socket.lock().abort(); - } -} - -impl UdpAssociation { - fn new( - context: Arc, - inbound: Arc, - peer_addr: SocketAddr, - assoc_map: Arc>>, - balancer: PingBalancer, - ) -> UdpAssociation { - let (assoc, sender) = UdpAssociationContext::new(context, inbound, peer_addr, assoc_map, balancer); - UdpAssociation { assoc, sender } - } - - fn try_send(&self, data: (Address, Bytes)) -> io::Result<()> { - if let Err(..) = self.sender.try_send(data) { - let err = io::Error::new(ErrorKind::Other, "udp relay channel full"); - return Err(err); - } - Ok(()) - } -} - -enum UdpAssociationBypassState { - Empty, - Connected { - socket: Arc, - abortable: AbortHandle, - }, - Aborted, -} - -impl Drop for UdpAssociationBypassState { - fn drop(&mut self) { - if let UdpAssociationBypassState::Connected { ref abortable, .. } = *self { - abortable.abort(); - } - } -} - -impl UdpAssociationBypassState { - fn empty() -> UdpAssociationBypassState { - UdpAssociationBypassState::Empty - } - - fn set_connected(&mut self, socket: Arc, abortable: AbortHandle) { - *self = UdpAssociationBypassState::Connected { socket, abortable }; - } - - fn abort(&mut self) { - *self = UdpAssociationBypassState::Aborted; - } -} - -enum UdpAssociationProxyState { - Empty, - Connected { - socket: Arc, - abortable: AbortHandle, - }, - Aborted, -} - -impl Drop for UdpAssociationProxyState { - fn drop(&mut self) { - if let UdpAssociationProxyState::Connected { ref abortable, .. } = *self { - abortable.abort(); - } - } -} - -impl UdpAssociationProxyState { - fn empty() -> UdpAssociationProxyState { - UdpAssociationProxyState::Empty - } - - fn reset(&mut self) { - *self = UdpAssociationProxyState::Empty; - } - - fn set_connected(&mut self, socket: Arc, abortable: AbortHandle) { - *self = UdpAssociationProxyState::Connected { socket, abortable }; - } - - fn abort(&mut self) { - *self = UdpAssociationProxyState::Aborted; - } -} - -struct UdpAssociationContext { - context: Arc, - inbound: Arc, - peer_addr: SocketAddr, - bypassed_ipv4_socket: SpinMutex, - bypassed_ipv6_socket: SpinMutex, - proxied_socket: SpinMutex, - assoc_map: Arc>>, - balancer: PingBalancer, -} - -impl Drop for UdpAssociationContext { - fn drop(&mut self) { - trace!("udp association for {} is closed", self.peer_addr); - } -} - -impl UdpAssociationContext { - fn new( - context: Arc, - inbound: Arc, - peer_addr: SocketAddr, - assoc_map: Arc>>, - balancer: PingBalancer, - ) -> (Arc, mpsc::Sender<(Address, Bytes)>) { - // Pending packets 1024 should be good enough for a server. - // If there are plenty of packets stuck in the channel, dropping exccess packets is a good way to protect the server from - // being OOM. - let (sender, receiver) = mpsc::channel(1024); - - let assoc = Arc::new(UdpAssociationContext { - context, - inbound, - peer_addr, - bypassed_ipv4_socket: SpinMutex::new(UdpAssociationBypassState::empty()), - bypassed_ipv6_socket: SpinMutex::new(UdpAssociationBypassState::empty()), - proxied_socket: SpinMutex::new(UdpAssociationProxyState::empty()), - assoc_map, - balancer, - }); - - let l2r_task = { - let assoc = assoc.clone(); - assoc.copy_l2r(receiver) - }; - tokio::spawn(l2r_task); - - (assoc, sender) - } - - async fn copy_l2r(self: Arc, mut receiver: mpsc::Receiver<(Address, Bytes)>) { - while let Some((target_addr, data)) = receiver.recv().await { - let bypassed = self.context.check_target_bypassed(&target_addr).await; - - trace!( - "udp relay {} -> {} ({}) with {} bytes", - self.peer_addr, - target_addr, - if bypassed { "bypassed" } else { "proxied" }, - data.len() - ); - - let assoc = self.clone(); - if bypassed { - if let Err(err) = assoc.copy_bypassed_l2r(&target_addr, &data).await { - error!( - "udp relay {} -> {} (bypassed) with {} bytes, error: {}", - self.peer_addr, - target_addr, - data.len(), - err - ); - } - } else { - if let Err(err) = assoc.copy_proxied_l2r(&target_addr, &data).await { - error!( - "udp relay {} -> {} (proxied) with {} bytes, error: {}", - self.peer_addr, - target_addr, - data.len(), - err - ); - } - } - } - } - - async fn copy_bypassed_l2r(self: Arc, target_addr: &Address, data: &[u8]) -> io::Result<()> { - match *target_addr { - Address::SocketAddress(sa) => match sa { - SocketAddr::V4(..) => self.copy_bypassed_ipv4_l2r(sa, data).await, - SocketAddr::V6(..) => self.copy_bypassed_ipv6_l2r(sa, data).await, - }, - Address::DomainNameAddress(ref dname, port) => { - lookup_then!(self.context.context_ref(), dname, port, |sa| { - match sa { - SocketAddr::V4(..) => self.clone().copy_bypassed_ipv4_l2r(sa, data).await, - SocketAddr::V6(..) => self.clone().copy_bypassed_ipv6_l2r(sa, data).await, - } - }) - .map(|_| ()) - } - } - } - - async fn copy_bypassed_ipv4_l2r(self: Arc, target_addr: SocketAddr, data: &[u8]) -> io::Result<()> { - let socket = { - let mut handle = self.bypassed_ipv4_socket.lock(); - - match *handle { - UdpAssociationBypassState::Empty => { - // Create a new connection to proxy server - - let socket = - ShadowUdpSocket::connect_any_with_opts(&target_addr, self.context.connect_opts_ref()).await?; - let socket: Arc = Arc::new(socket.into()); - - let (r2l_fut, r2l_abortable) = { - let assoc = self.clone(); - future::abortable(assoc.copy_bypassed_r2l(socket.clone())) - }; - - // CLIENT <- REMOTE - tokio::spawn(r2l_fut); - debug!( - "created udp association for {} (bypassed) with {:?}", - self.peer_addr, - self.context.connect_opts_ref() - ); - - handle.set_connected(socket.clone(), r2l_abortable); - socket - } - UdpAssociationBypassState::Connected { ref socket, .. } => socket.clone(), - UdpAssociationBypassState::Aborted => { - debug!( - "udp association for {} (bypassed) have been aborted, dropped packet {} bytes to {}", - self.peer_addr, - data.len(), - target_addr - ); - return Ok(()); - } - } - }; - - let n = socket.send_to(data, target_addr).await?; - if n != data.len() { - warn!( - "{} -> {} sent {} bytes != expected {} bytes", - self.peer_addr, - target_addr, - n, - data.len() - ); - } - - Ok(()) - } - - async fn copy_bypassed_ipv6_l2r(self: Arc, target_addr: SocketAddr, data: &[u8]) -> io::Result<()> { - let socket = { - let mut handle = self.bypassed_ipv6_socket.lock(); - - match *handle { - UdpAssociationBypassState::Empty => { - // Create a new connection to proxy server - - let socket = - ShadowUdpSocket::connect_any_with_opts(&target_addr, self.context.connect_opts_ref()).await?; - let socket: Arc = Arc::new(socket.into()); - - let (r2l_fut, r2l_abortable) = { - let assoc = self.clone(); - future::abortable(assoc.copy_bypassed_r2l(socket.clone())) - }; - - // CLIENT <- REMOTE - tokio::spawn(r2l_fut); - debug!( - "created udp association for {} (bypassed) with {:?}", - self.peer_addr, - self.context.connect_opts_ref() - ); - - handle.set_connected(socket.clone(), r2l_abortable); - socket - } - UdpAssociationBypassState::Connected { ref socket, .. } => socket.clone(), - UdpAssociationBypassState::Aborted => { - debug!( - "udp association for {} (bypassed) have been aborted, dropped packet {} bytes to {}", - self.peer_addr, - data.len(), - target_addr - ); - return Ok(()); - } - } - }; - - let n = socket.send_to(data, target_addr).await?; - if n != data.len() { - warn!( - "{} -> {} sent {} bytes != expected {} bytes", - self.peer_addr, - target_addr, - n, - data.len() - ); - } - - Ok(()) - } - - async fn copy_proxied_l2r(self: Arc, target_addr: &Address, data: &[u8]) -> io::Result<()> { - let mut last_err = io::Error::new(ErrorKind::Other, "udp relay sendto failed after retry"); - - for tried in 0..3 { - let socket = { - let mut handle = self.proxied_socket.lock(); - - match *handle { - UdpAssociationProxyState::Empty => { - // Create a new connection to proxy server - - let server = self.balancer.best_udp_server(); - let svr_cfg = server.server_config(); - - let socket = ProxySocket::connect_with_opts( - self.context.context(), - svr_cfg, - self.context.connect_opts_ref(), - ) - .await?; - let socket = MonProxySocket::from_socket(socket, self.context.flow_stat()); - let socket = Arc::new(socket); - - let (r2l_fut, r2l_abortable) = { - let assoc = self.clone(); - future::abortable(assoc.copy_proxied_r2l(socket.clone())) - }; - - // CLIENT <- REMOTE - tokio::spawn(r2l_fut); - - debug!( - "created udp association for {} <-> {} (proxied) with {:?}", - self.peer_addr, - svr_cfg.addr(), - self.context.connect_opts_ref() - ); - - handle.set_connected(socket.clone(), r2l_abortable); - socket - } - UdpAssociationProxyState::Connected { ref socket, .. } => socket.clone(), - UdpAssociationProxyState::Aborted => { - debug!( - "udp association for {} (proxied) have been aborted, dropped packet {} bytes to {}", - self.peer_addr, - data.len(), - target_addr - ); - return Ok(()); - } - } - }; - - match socket.send(target_addr, data).await { - Ok(..) => return Ok(()), - Err(err) => { - debug!( - "{} -> {} (proxied) sending {} bytes failed, tried: {}, error: {}", - self.peer_addr, - target_addr, - data.len(), - tried + 1, - err - ); - last_err = err; - - // Reset for reconnecting - self.proxied_socket.lock().reset(); - - tokio::task::yield_now().await; - } - } - } - - Err(last_err) - } - - async fn copy_proxied_r2l(self: Arc, outbound: Arc) -> io::Result<()> { - let mut buffer = [0u8; MAXIMUM_UDP_PAYLOAD_SIZE]; - let mut payload_buffer = BytesMut::new(); - loop { - let (n, addr) = match outbound.recv(&mut buffer).await { - Ok(n) => { - // Keep association alive in map - let _ = self.assoc_map.lock().await.get(&self.peer_addr); - n - } - Err(err) => { - // Socket that connected to remote server returns an error, it should be ECONNREFUSED in most cases. - // That indicates that the association on the server side have been dropped. - // - // There is no point to keep this socket. Drop it immediately. - self.proxied_socket.lock().reset(); - - error!( - "udp failed to receive from proxied outbound socket, peer_addr: {}, error: {}", - self.peer_addr, err - ); - time::sleep(Duration::from_secs(1)).await; - continue; - } - }; - - let data = &buffer[..n]; - payload_buffer.clear(); - - // Resssemble packet - let header = UdpAssociateHeader::new(0, addr.clone()); - payload_buffer.reserve(header.serialized_len() + n); - - header.write_to_buf(&mut payload_buffer); - payload_buffer.put_slice(data); - - // Send back to client - if let Err(err) = self.inbound.send_to(&payload_buffer, self.peer_addr).await { - warn!( - "udp failed to send back to client {}, from target {}, error: {}", - self.peer_addr, addr, err - ); - } - - trace!( - "udp relay {} <- {} with {} bytes", - self.peer_addr, - addr, - payload_buffer.len() - ); - } - } - - async fn copy_bypassed_r2l(self: Arc, outbound: Arc) -> io::Result<()> { - let mut buffer = [0u8; MAXIMUM_UDP_PAYLOAD_SIZE]; - loop { - let (n, addr) = match outbound.recv_from(&mut buffer).await { - Ok(n) => { - // Keep association alive in map - let _ = self.assoc_map.lock().await.get(&self.peer_addr); - n - } - Err(err) => { - error!( - "udp failed to receive from bypass outbound socket, peer_addr: {}, error: {}", - self.peer_addr, err - ); - time::sleep(Duration::from_secs(1)).await; - continue; - } - }; - - let data = &buffer[..n]; - - // Send back to client - if let Err(err) = self.inbound.send_to(data, self.peer_addr).await { - warn!( - "udp failed to send back to client {}, from target {}, error: {}", - self.peer_addr, addr, err - ); - } - - trace!("udp relay {} <- {} with {} bytes", self.peer_addr, addr, data.len()); - } - } } diff --git a/crates/shadowsocks/src/lib.rs b/crates/shadowsocks/src/lib.rs index 6d8a7ea71ded..f5f348ade7f3 100644 --- a/crates/shadowsocks/src/lib.rs +++ b/crates/shadowsocks/src/lib.rs @@ -1,3 +1,5 @@ +//! Shadowsocks Core Library + #![crate_type = "lib"] pub use self::{