diff --git a/Cargo.lock b/Cargo.lock index 52c2bc211238..6068a06c79fd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1208,7 +1208,7 @@ dependencies = [ [[package]] name = "shadowsocks-rust" -version = "1.7.0-alpha.18" +version = "1.7.0-alpha.19" dependencies = [ "base64 0.9.3 (registry+https://github.com/rust-lang/crates.io-index)", "byte_string 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/src/relay/dns_resolver.rs b/src/relay/dns_resolver.rs index c5618193fc5c..8714dc580a71 100644 --- a/src/relay/dns_resolver.rs +++ b/src/relay/dns_resolver.rs @@ -54,24 +54,23 @@ fn inner_resolve( port: u16, check_forbidden: bool, ) -> impl Future, Error = io::Error> + Send { - let owned_addr = addr.to_owned(); - let owned_addr2 = owned_addr.clone(); + // let owned_addr = addr.to_owned(); let cloned_context = context.clone(); - - context - .dns_resolver() - .lookup_ip(addr) - .map_err(move |err| { - error!("Failed to resolve {}, err: {}", owned_addr2, err); - io::Error::new(io::ErrorKind::Other, "dns resolve error") - }) - .and_then(move |lookup_result| { + context.dns_resolver().lookup_ip(addr).then(move |r| match r { + Err(err) => { + // error!("Failed to resolve {}, err: {}", owned_addr, err); + Err(io::Error::new( + io::ErrorKind::Other, + format!("dns resolve error: {}", err), + )) + } + Ok(lookup_result) => { let mut vaddr = Vec::new(); for ip in lookup_result.iter() { if check_forbidden { let forbidden_ip = &cloned_context.config().forbidden_ip; if forbidden_ip.contains(&ip) { - debug!("Resolved {} => {}, which is skipped by forbidden_ip", owned_addr, ip); + // debug!("Resolved {} => {}, which is skipped by forbidden_ip", owned_addr, ip); continue; } } @@ -81,14 +80,16 @@ fn inner_resolve( if vaddr.is_empty() { let err = io::Error::new( ErrorKind::Other, - format!("resolved {} to empty address, all IPs are filtered", owned_addr), + // format!("resolved {} to empty address, all IPs are filtered", owned_addr), + "resolved to empty address, all IPs are filtered", ); Err(err) } else { - debug!("Resolved {} => {:?}", owned_addr, vaddr); + // debug!("Resolved {} => {:?}", owned_addr, vaddr); Ok(vaddr) } - }) + } + }) } /// Resolve address to IP diff --git a/src/relay/tcprelay/mod.rs b/src/relay/tcprelay/mod.rs index c178a18385ff..b3c588c33830 100644 --- a/src/relay/tcprelay/mod.rs +++ b/src/relay/tcprelay/mod.rs @@ -47,14 +47,14 @@ mod utils; const BUFFER_SIZE: usize = 8 * 1024; // 8K buffer -/// Directions in the tunnel -#[derive(Debug, Copy, Clone)] -pub enum TunnelDirection { - /// Client -> Server - Client2Server, - /// Client <- Server - Server2Client, -} +// /// Directions in the tunnel +// #[derive(Debug, Copy, Clone)] +// pub enum TunnelDirection { +// /// Client -> Server +// Client2Server, +// /// Client <- Server +// Server2Client, +// } type TcpReadHalf = ReadHalf; type TcpWriteHalf = WriteHalf; @@ -366,52 +366,58 @@ pub fn proxy_handshake( } /// Establish tunnel between server and client -pub fn tunnel(addr: Address, c2s: CF, s2c: SF) -> impl Future + Send +// pub fn tunnel(addr: Address, c2s: CF, s2c: SF) -> impl Future + Send +pub fn tunnel(c2s: CF, s2c: SF) -> impl Future + Send where CF: Future + Send + 'static, SF: Future + Send + 'static, { - let addr = Arc::new(addr); - - let cloned_addr = addr.clone(); - let c2s = c2s.then(move |res| { - match res { - Ok(..) => { - // Continue reading response from remote server - trace!("Relay {} client -> server is finished", cloned_addr); - - Ok(TunnelDirection::Client2Server) - } - Err(err) => { - error!("Relay {} client -> server aborted: {}", cloned_addr, err); - Err(err) - } - } - }); - - let cloned_addr = addr.clone(); - let s2c = s2c.then(move |res| match res { - Ok(..) => { - trace!("Relay {} client <- server is finished", cloned_addr); - - Ok(TunnelDirection::Server2Client) - } - Err(err) => { - error!("Relay {} client <- server aborted: {}", cloned_addr, err); - Err(err) - } - }); - - c2s.select(s2c) - .and_then(move |(dir, _)| { - match dir { - TunnelDirection::Server2Client => trace!("Relay {} client <- server is closed, abort connection", addr), - TunnelDirection::Client2Server => trace!("Relay {} server -> client is closed, abort connection", addr), - } + c2s.map(|_| ()).select(s2c.map(|_| ())).then(|r| match r { + Ok(..) => Ok(()), + Err((err, _)) => Err(err), + }) - Ok(()) - }) - .map_err(|(err, _)| err) + // let addr = Arc::new(addr); + + // let cloned_addr = addr.clone(); + // let c2s = c2s.then(move |res| { + // match res { + // Ok(..) => { + // // Continue reading response from remote server + // trace!("Relay {} client -> server is finished", cloned_addr); + + // Ok(TunnelDirection::Client2Server) + // } + // Err(err) => { + // error!("Relay {} client -> server aborted: {}", cloned_addr, err); + // Err(err) + // } + // } + // }); + + // let cloned_addr = addr.clone(); + // let s2c = s2c.then(move |res| match res { + // Ok(..) => { + // trace!("Relay {} client <- server is finished", cloned_addr); + + // Ok(TunnelDirection::Server2Client) + // } + // Err(err) => { + // error!("Relay {} client <- server aborted: {}", cloned_addr, err); + // Err(err) + // } + // }); + + // c2s.select(s2c) + // .and_then(move |(dir, _)| { + // match dir { + // TunnelDirection::Server2Client => trace!("Relay {} client <- server is closed, abort connection", addr), + // TunnelDirection::Client2Server => trace!("Relay {} server -> client is closed, abort connection", addr), + // } + + // Ok(()) + // }) + // .map_err(|(err, _)| err) } /// Read until EOF, and ignore diff --git a/src/relay/tcprelay/server.rs b/src/relay/tcprelay/server.rs index 88e97e5283ca..0cb34129c278 100644 --- a/src/relay/tcprelay/server.rs +++ b/src/relay/tcprelay/server.rs @@ -144,13 +144,11 @@ where Item = TcpRelayClientConnected + Send + 'static>, Error = io::Error, > + Send { - let addr = self.addr.clone(); let client_pair = (self.r, self.w); let timeout = self.timeout; connect_remote(self.context, self.addr, self.timeout).map(move |stream| TcpRelayClientConnected { server: stream.split(), client: client_pair, - addr: addr, timeout: timeout, }) } @@ -163,7 +161,6 @@ where { server: (ReadHalf, WriteHalf), client: (DecryptedHalf, E), - addr: Address, timeout: Option, } @@ -179,7 +176,6 @@ where let timeout = self.timeout; tunnel( - self.addr, r.copy_timeout_opt(svr_w, self.timeout), w_fut.and_then(move |w| w.copy_timeout_opt(svr_r, timeout)), ) diff --git a/src/relay/tcprelay/socks5_local.rs b/src/relay/tcprelay/socks5_local.rs index 07d5e995535c..2f7d0ba1d50d 100644 --- a/src/relay/tcprelay/socks5_local.rs +++ b/src/relay/tcprelay/socks5_local.rs @@ -38,10 +38,8 @@ fn handle_socks5_connect( addr: Address, svr_cfg: Arc, ) -> impl Future + Send { - let cloned_addr = addr.clone(); - let cloned_svr_cfg = svr_cfg.clone(); let timeout = svr_cfg.timeout(); - super::connect_proxy_server(context.clone(), svr_cfg) + super::connect_proxy_server(context, svr_cfg.clone()) .then(move |res| { let (header, r) = match res { Ok(svr_s) => { @@ -70,21 +68,18 @@ fn handle_socks5_connect( } }; - trace!("Send header: {:?}", header); - try_timeout(try_timeout(header.write_to(w), timeout).and_then(flush), timeout).and_then(|w| match r { + trace!("Sent header: {:?}", header); + try_timeout(header.write_to(w).and_then(flush), timeout).and_then(|w| match r { Ok(svr_s) => Ok((svr_s, w)), Err(err) => Err(err), }) }) .and_then(move |(svr_s, w)| { - let svr_cfg = cloned_svr_cfg; let timeout = svr_cfg.timeout(); super::proxy_server_handshake(svr_s, svr_cfg, addr).and_then(move |(svr_r, svr_w)| { - let cloned_timeout = timeout; let rhalf = svr_r.and_then(move |svr_r| svr_r.copy_timeout_opt(w, timeout)); - let whalf = svr_w.and_then(move |svr_w| svr_w.copy_timeout_opt(r, cloned_timeout)); - - tunnel(cloned_addr, whalf, rhalf) + let whalf = svr_w.and_then(move |svr_w| svr_w.copy_timeout_opt(r, timeout)); + tunnel(whalf, rhalf) }) }) } @@ -154,7 +149,11 @@ fn handle_socks5_client( match header.command { socks5::Command::TcpConnect => { debug!("CONNECT {}", addr); - let fut = handle_socks5_connect(context, (r, w), cloned_client_addr, addr, conf); + let fut = + handle_socks5_connect(context, (r, w), cloned_client_addr, addr.clone(), conf).map_err(move |err| { + error!("CONNECT {} failed with error: {}", addr, err); + err + }); boxed_future(fut) } socks5::Command::TcpBind => {