From 93af5c35697ba5d5c0eaed163b98d12e28ab6097 Mon Sep 17 00:00:00 2001 From: Mariappan Ramasamy <142216110+kp-mariappan-ramasamy@users.noreply.github.com> Date: Tue, 23 Jul 2024 10:46:20 +0800 Subject: [PATCH] Add support for concurrent send/recv apis for tun device (#87) * Add send and recv apis similar to Tokio::UdpSocket Tokio supports async api's for UDP socket which does not need `&mut self` pub async fn recv(&self, buf: &mut [u8]) -> Result pub async fn send(&self, buf: &[u8]) -> Result This makes it pretty easy to call it concurrently, without adding Arc> to UdpSocket. This PR tries to add support for similar apis to tun device. In this commit, only sync version of recv/send apis are added. Async version will be added in further commit. Reference: https://docs.rs/tokio/latest/tokio/net/struct.UdpSocket.html#method.recv https://docs.rs/tokio/latest/tokio/net/struct.UdpSocket.html#method.send * Refactor to use STACK_BUF_LEN as const * Add async unix send/recv apis for unix Since unix uses AsyncFd from tokio, it is easier to wait for Ready on AsyncFd and call sync send/recv apis This commit does not take care of windows, which is not as straight-forward as posix variants. --- src/async/unix_device.rs | 19 ++++++++++ src/async/win_device.rs | 10 +++++ src/platform/android/device.rs | 10 +++++ src/platform/freebsd/device.rs | 10 +++++ src/platform/ios/device.rs | 10 +++++ src/platform/linux/device.rs | 10 +++++ src/platform/macos/device.rs | 10 +++++ src/platform/posix/split.rs | 68 ++++++++++++++++++++++++++++++++++ src/platform/windows/device.rs | 20 ++++++++++ 9 files changed, 167 insertions(+) diff --git a/src/async/unix_device.rs b/src/async/unix_device.rs index 8c5530de..4403125d 100644 --- a/src/async/unix_device.rs +++ b/src/async/unix_device.rs @@ -17,6 +17,7 @@ use core::task::{Context, Poll}; use futures_core::ready; use std::io::{IoSlice, Read, Write}; use tokio::io::unix::AsyncFd; +use tokio::io::Interest; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; use tokio_util::codec::Framed; @@ -59,6 +60,24 @@ impl AsyncDevice { // associate mtu with the capacity of ReadBuf Framed::with_capacity(self, codec, mtu as usize) } + + /// Recv a packet from tun device + pub async fn recv(&self, buf: &mut [u8]) -> std::io::Result { + let guard = self.inner.readable().await?; + guard + .get_ref() + .async_io(Interest::READABLE, |inner| inner.recv(buf)) + .await + } + + /// Send a packet to tun device + pub async fn send(&self, buf: &[u8]) -> std::io::Result { + let guard = self.inner.writable().await?; + guard + .get_ref() + .async_io(Interest::WRITABLE, |inner| inner.send(buf)) + .await + } } impl AsyncRead for AsyncDevice { diff --git a/src/async/win_device.rs b/src/async/win_device.rs index d39bcd26..541a0d0b 100644 --- a/src/async/win_device.rs +++ b/src/async/win_device.rs @@ -60,6 +60,16 @@ impl AsyncDevice { // guarantee to avoid the mtu of wintun may far away larger than the default provided capacity of ReadBuf of Framed Framed::with_capacity(self, codec, mtu as usize) } + + /// Recv a packet from tun device - Not implemented for windows + pub async fn recv(&self, _buf: &mut [u8]) -> std::io::Result { + unimplemented!() + } + + /// Send a packet to tun device - Not implemented for windows + pub async fn send(&self, _buf: &[u8]) -> std::io::Result { + unimplemented!() + } } impl AsyncRead for AsyncDevice { diff --git a/src/platform/android/device.rs b/src/platform/android/device.rs index ca295607..e8c144b7 100644 --- a/src/platform/android/device.rs +++ b/src/platform/android/device.rs @@ -68,6 +68,16 @@ impl Device { pub fn set_nonblock(&self) -> std::io::Result<()> { self.tun.set_nonblock() } + + /// Recv a packet from tun device + pub fn recv(&self, buf: &mut [u8]) -> io::Result { + self.tun.recv(buf) + } + + /// Send a packet to tun device + pub fn send(&self, buf: &[u8]) -> io::Result { + self.tun.send(buf) + } } impl Read for Device { diff --git a/src/platform/freebsd/device.rs b/src/platform/freebsd/device.rs index ea3cc9b2..cc81e7ba 100644 --- a/src/platform/freebsd/device.rs +++ b/src/platform/freebsd/device.rs @@ -243,6 +243,16 @@ impl Device { self.route = Some(route); Ok(()) } + + /// Recv a packet from tun device + pub fn recv(&self, buf: &mut [u8]) -> io::Result { + self.tun.recv(buf) + } + + /// Send a packet to tun device + pub fn send(&self, buf: &[u8]) -> io::Result { + self.tun.send(buf) + } } impl Read for Device { diff --git a/src/platform/ios/device.rs b/src/platform/ios/device.rs index d8506a13..5c788b5b 100644 --- a/src/platform/ios/device.rs +++ b/src/platform/ios/device.rs @@ -70,6 +70,16 @@ impl Device { pub fn set_nonblock(&self) -> std::io::Result<()> { self.tun.set_nonblock() } + + /// Recv a packet from tun device + pub fn recv(&self, buf: &mut [u8]) -> io::Result { + self.tun.recv(buf) + } + + /// Send a packet to tun device + pub fn send(&self, buf: &[u8]) -> io::Result { + self.tun.send(buf) + } } impl Read for Device { diff --git a/src/platform/linux/device.rs b/src/platform/linux/device.rs index 132656c1..194dd92e 100644 --- a/src/platform/linux/device.rs +++ b/src/platform/linux/device.rs @@ -186,6 +186,16 @@ impl Device { pub fn set_nonblock(&self) -> io::Result<()> { self.tun.set_nonblock() } + + /// Recv a packet from tun device + pub fn recv(&self, buf: &mut [u8]) -> io::Result { + self.tun.recv(buf) + } + + /// Send a packet to tun device + pub fn send(&self, buf: &[u8]) -> io::Result { + self.tun.send(buf) + } } impl Read for Device { diff --git a/src/platform/macos/device.rs b/src/platform/macos/device.rs index 9f0bfd5c..df869996 100644 --- a/src/platform/macos/device.rs +++ b/src/platform/macos/device.rs @@ -274,6 +274,16 @@ impl Device { self.route = Some(route); Ok(()) } + + /// Recv a packet from tun device + pub fn recv(&self, buf: &mut [u8]) -> io::Result { + self.tun.recv(buf) + } + + /// Send a packet to tun device + pub fn send(&self, buf: &[u8]) -> io::Result { + self.tun.send(buf) + } } impl Read for Device { diff --git a/src/platform/posix/split.rs b/src/platform/posix/split.rs index 0475b6d2..30f802cf 100644 --- a/src/platform/posix/split.rs +++ b/src/platform/posix/split.rs @@ -68,12 +68,39 @@ pub struct Reader { pub(crate) fd: Arc, pub(crate) offset: usize, pub(crate) buf: Vec, + pub(crate) mtu: u16, } impl Reader { pub(crate) fn set_mtu(&mut self, value: u16) { + self.mtu = value; self.buf.resize(value as usize + self.offset, 0); } + + pub(crate) fn recv(&self, mut in_buf: &mut [u8]) -> io::Result { + const STACK_BUF_LEN: usize = crate::DEFAULT_MTU as usize + PIL; + let in_buf_len = in_buf.len() + self.offset; + + // The following logic is to prevent dynamically allocating Vec on every recv + // As long as the MTU is set to value lesser than 1500, this api uses `stack_buf` + // and avoids `Vec` allocation + let local_buf = if in_buf_len > STACK_BUF_LEN && self.offset != 0 { + &mut vec![0u8; in_buf_len][..] + } else { + &mut [0u8; STACK_BUF_LEN] + }; + + let either_buf = if self.offset != 0 { + &mut *local_buf + } else { + &mut *in_buf + }; + let amount = self.fd.read(either_buf)?; + if self.offset != 0 { + in_buf.put_slice(&local_buf[self.offset..amount]); + } + Ok(amount - self.offset) + } } impl Read for Reader { @@ -106,12 +133,43 @@ pub struct Writer { pub(crate) fd: Arc, pub(crate) offset: usize, pub(crate) buf: Vec, + pub(crate) mtu: u16, } impl Writer { pub(crate) fn set_mtu(&mut self, value: u16) { + self.mtu = value; self.buf.resize(value as usize + self.offset, 0); } + + pub(crate) fn send(&self, in_buf: &[u8]) -> io::Result { + const STACK_BUF_LEN: usize = crate::DEFAULT_MTU as usize + PIL; + let in_buf_len = in_buf.len() + self.offset; + + // The following logic is to prevent dynamically allocating Vec on every send + // As long as the MTU is set to value lesser than 1500, this api uses `stack_buf` + // and avoids `Vec` allocation + let local_buf = if in_buf_len > STACK_BUF_LEN && self.offset != 0 { + &mut vec![0u8; in_buf_len][..] + } else { + &mut [0u8; STACK_BUF_LEN] + }; + + let either_buf = if self.offset != 0 { + let ipv6 = is_ipv6(in_buf)?; + if let Some(header) = generate_packet_information(true, ipv6) { + (&mut local_buf[..self.offset]).put_slice(header.as_ref()); + (&mut local_buf[self.offset..in_buf_len]).put_slice(in_buf); + local_buf + } else { + in_buf + } + } else { + in_buf + }; + let amount = self.fd.write(either_buf)?; + Ok(amount - self.offset) + } } impl Write for Writer { @@ -163,11 +221,13 @@ impl Tun { fd: fd.clone(), offset, buf: vec![0; mtu as usize + offset], + mtu, }, writer: Writer { fd, offset, buf: vec![0; mtu as usize + offset], + mtu, }, mtu, packet_information, @@ -191,6 +251,14 @@ impl Tun { pub fn packet_information(&self) -> bool { self.packet_information } + + pub fn recv(&self, buf: &mut [u8]) -> io::Result { + self.reader.recv(buf) + } + + pub fn send(&self, buf: &[u8]) -> io::Result { + self.writer.send(buf) + } } impl Read for Tun { diff --git a/src/platform/windows/device.rs b/src/platform/windows/device.rs index 86e0116c..6f4e579a 100644 --- a/src/platform/windows/device.rs +++ b/src/platform/windows/device.rs @@ -88,6 +88,16 @@ impl Device { let tun = Arc::new(self.tun); (Reader(tun.clone()), Writer(tun)) } + + /// Recv a packet from tun device + pub fn recv(&self, buf: &mut [u8]) -> io::Result { + self.tun.recv(buf) + } + + /// Send a packet to tun device + pub fn send(&self, buf: &[u8]) -> io::Result { + self.tun.send(buf) + } } impl Read for Device { @@ -248,6 +258,16 @@ impl Tun { }, } } + + /// Recv a packet from tun device + pub fn recv(&self, buf: &mut [u8]) -> io::Result { + self.read_by_ref(buf) + } + + /// Send a packet to tun device + pub fn send(&self, buf: &[u8]) -> io::Result { + self.write_by_ref(buf) + } } impl Read for Tun {