Skip to content

Commit

Permalink
Add support for concurrent send/recv apis for tun device (meh#87)
Browse files Browse the repository at this point in the history
* Add send and recv apis similar to Tokio::UdpSocket

Tokio supports async api's for UDP socket which does not need `&mut self`

pub async fn recv(&self, buf: &mut [u8]) -> Result<usize>
pub async fn send(&self, buf: &[u8]) -> Result<usize>

This makes it pretty easy to call it concurrently, without adding Arc<Mutex<T>> to
UdpSocket.

This PR tries to add support for similar apis to tun device.
In this commit, only sync version of recv/send apis are added. Async version will be added in
further commit.

Reference:
https://docs.rs/tokio/latest/tokio/net/struct.UdpSocket.html#method.recv
https://docs.rs/tokio/latest/tokio/net/struct.UdpSocket.html#method.send

* Refactor to use STACK_BUF_LEN as const

* Add async unix send/recv apis for unix

Since unix uses AsyncFd from tokio, it is easier to wait for Ready
on AsyncFd and call sync send/recv apis

This commit does not take care of windows, which is not as straight-forward as posix variants.
  • Loading branch information
kp-mariappan-ramasamy authored Jul 23, 2024
1 parent 777ba78 commit 93af5c3
Show file tree
Hide file tree
Showing 9 changed files with 167 additions and 0 deletions.
19 changes: 19 additions & 0 deletions src/async/unix_device.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use core::task::{Context, Poll};
use futures_core::ready;
use std::io::{IoSlice, Read, Write};
use tokio::io::unix::AsyncFd;
use tokio::io::Interest;
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio_util::codec::Framed;

Expand Down Expand Up @@ -59,6 +60,24 @@ impl AsyncDevice {
// associate mtu with the capacity of ReadBuf
Framed::with_capacity(self, codec, mtu as usize)
}

/// Recv a packet from tun device
pub async fn recv(&self, buf: &mut [u8]) -> std::io::Result<usize> {
let guard = self.inner.readable().await?;
guard
.get_ref()
.async_io(Interest::READABLE, |inner| inner.recv(buf))
.await
}

/// Send a packet to tun device
pub async fn send(&self, buf: &[u8]) -> std::io::Result<usize> {
let guard = self.inner.writable().await?;
guard
.get_ref()
.async_io(Interest::WRITABLE, |inner| inner.send(buf))
.await
}
}

impl AsyncRead for AsyncDevice {
Expand Down
10 changes: 10 additions & 0 deletions src/async/win_device.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,16 @@ impl AsyncDevice {
// guarantee to avoid the mtu of wintun may far away larger than the default provided capacity of ReadBuf of Framed
Framed::with_capacity(self, codec, mtu as usize)
}

/// Recv a packet from tun device - Not implemented for windows
pub async fn recv(&self, _buf: &mut [u8]) -> std::io::Result<usize> {
unimplemented!()
}

/// Send a packet to tun device - Not implemented for windows
pub async fn send(&self, _buf: &[u8]) -> std::io::Result<usize> {
unimplemented!()
}
}

impl AsyncRead for AsyncDevice {
Expand Down
10 changes: 10 additions & 0 deletions src/platform/android/device.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,16 @@ impl Device {
pub fn set_nonblock(&self) -> std::io::Result<()> {
self.tun.set_nonblock()
}

/// Recv a packet from tun device
pub fn recv(&self, buf: &mut [u8]) -> io::Result<usize> {
self.tun.recv(buf)
}

/// Send a packet to tun device
pub fn send(&self, buf: &[u8]) -> io::Result<usize> {
self.tun.send(buf)
}
}

impl Read for Device {
Expand Down
10 changes: 10 additions & 0 deletions src/platform/freebsd/device.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,16 @@ impl Device {
self.route = Some(route);
Ok(())
}

/// Recv a packet from tun device
pub fn recv(&self, buf: &mut [u8]) -> io::Result<usize> {
self.tun.recv(buf)
}

/// Send a packet to tun device
pub fn send(&self, buf: &[u8]) -> io::Result<usize> {
self.tun.send(buf)
}
}

impl Read for Device {
Expand Down
10 changes: 10 additions & 0 deletions src/platform/ios/device.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,16 @@ impl Device {
pub fn set_nonblock(&self) -> std::io::Result<()> {
self.tun.set_nonblock()
}

/// Recv a packet from tun device
pub fn recv(&self, buf: &mut [u8]) -> io::Result<usize> {
self.tun.recv(buf)
}

/// Send a packet to tun device
pub fn send(&self, buf: &[u8]) -> io::Result<usize> {
self.tun.send(buf)
}
}

impl Read for Device {
Expand Down
10 changes: 10 additions & 0 deletions src/platform/linux/device.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,16 @@ impl Device {
pub fn set_nonblock(&self) -> io::Result<()> {
self.tun.set_nonblock()
}

/// Recv a packet from tun device
pub fn recv(&self, buf: &mut [u8]) -> io::Result<usize> {
self.tun.recv(buf)
}

/// Send a packet to tun device
pub fn send(&self, buf: &[u8]) -> io::Result<usize> {
self.tun.send(buf)
}
}

impl Read for Device {
Expand Down
10 changes: 10 additions & 0 deletions src/platform/macos/device.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,16 @@ impl Device {
self.route = Some(route);
Ok(())
}

/// Recv a packet from tun device
pub fn recv(&self, buf: &mut [u8]) -> io::Result<usize> {
self.tun.recv(buf)
}

/// Send a packet to tun device
pub fn send(&self, buf: &[u8]) -> io::Result<usize> {
self.tun.send(buf)
}
}

impl Read for Device {
Expand Down
68 changes: 68 additions & 0 deletions src/platform/posix/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,39 @@ pub struct Reader {
pub(crate) fd: Arc<Fd>,
pub(crate) offset: usize,
pub(crate) buf: Vec<u8>,
pub(crate) mtu: u16,
}

impl Reader {
pub(crate) fn set_mtu(&mut self, value: u16) {
self.mtu = value;
self.buf.resize(value as usize + self.offset, 0);
}

pub(crate) fn recv(&self, mut in_buf: &mut [u8]) -> io::Result<usize> {
const STACK_BUF_LEN: usize = crate::DEFAULT_MTU as usize + PIL;
let in_buf_len = in_buf.len() + self.offset;

// The following logic is to prevent dynamically allocating Vec on every recv
// As long as the MTU is set to value lesser than 1500, this api uses `stack_buf`
// and avoids `Vec` allocation
let local_buf = if in_buf_len > STACK_BUF_LEN && self.offset != 0 {
&mut vec![0u8; in_buf_len][..]
} else {
&mut [0u8; STACK_BUF_LEN]
};

let either_buf = if self.offset != 0 {
&mut *local_buf
} else {
&mut *in_buf
};
let amount = self.fd.read(either_buf)?;
if self.offset != 0 {
in_buf.put_slice(&local_buf[self.offset..amount]);
}
Ok(amount - self.offset)
}
}

impl Read for Reader {
Expand Down Expand Up @@ -106,12 +133,43 @@ pub struct Writer {
pub(crate) fd: Arc<Fd>,
pub(crate) offset: usize,
pub(crate) buf: Vec<u8>,
pub(crate) mtu: u16,
}

impl Writer {
pub(crate) fn set_mtu(&mut self, value: u16) {
self.mtu = value;
self.buf.resize(value as usize + self.offset, 0);
}

pub(crate) fn send(&self, in_buf: &[u8]) -> io::Result<usize> {
const STACK_BUF_LEN: usize = crate::DEFAULT_MTU as usize + PIL;
let in_buf_len = in_buf.len() + self.offset;

// The following logic is to prevent dynamically allocating Vec on every send
// As long as the MTU is set to value lesser than 1500, this api uses `stack_buf`
// and avoids `Vec` allocation
let local_buf = if in_buf_len > STACK_BUF_LEN && self.offset != 0 {
&mut vec![0u8; in_buf_len][..]
} else {
&mut [0u8; STACK_BUF_LEN]
};

let either_buf = if self.offset != 0 {
let ipv6 = is_ipv6(in_buf)?;
if let Some(header) = generate_packet_information(true, ipv6) {
(&mut local_buf[..self.offset]).put_slice(header.as_ref());
(&mut local_buf[self.offset..in_buf_len]).put_slice(in_buf);
local_buf
} else {
in_buf
}
} else {
in_buf
};
let amount = self.fd.write(either_buf)?;
Ok(amount - self.offset)
}
}

impl Write for Writer {
Expand Down Expand Up @@ -163,11 +221,13 @@ impl Tun {
fd: fd.clone(),
offset,
buf: vec![0; mtu as usize + offset],
mtu,
},
writer: Writer {
fd,
offset,
buf: vec![0; mtu as usize + offset],
mtu,
},
mtu,
packet_information,
Expand All @@ -191,6 +251,14 @@ impl Tun {
pub fn packet_information(&self) -> bool {
self.packet_information
}

pub fn recv(&self, buf: &mut [u8]) -> io::Result<usize> {
self.reader.recv(buf)
}

pub fn send(&self, buf: &[u8]) -> io::Result<usize> {
self.writer.send(buf)
}
}

impl Read for Tun {
Expand Down
20 changes: 20 additions & 0 deletions src/platform/windows/device.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,16 @@ impl Device {
let tun = Arc::new(self.tun);
(Reader(tun.clone()), Writer(tun))
}

/// Recv a packet from tun device
pub fn recv(&self, buf: &mut [u8]) -> io::Result<usize> {
self.tun.recv(buf)
}

/// Send a packet to tun device
pub fn send(&self, buf: &[u8]) -> io::Result<usize> {
self.tun.send(buf)
}
}

impl Read for Device {
Expand Down Expand Up @@ -248,6 +258,16 @@ impl Tun {
},
}
}

/// Recv a packet from tun device
pub fn recv(&self, buf: &mut [u8]) -> io::Result<usize> {
self.read_by_ref(buf)
}

/// Send a packet to tun device
pub fn send(&self, buf: &[u8]) -> io::Result<usize> {
self.write_by_ref(buf)
}
}

impl Read for Tun {
Expand Down

0 comments on commit 93af5c3

Please sign in to comment.