From 555960327bef0086b5e93da50cf192ece0a47e1e Mon Sep 17 00:00:00 2001 From: Icelk Date: Tue, 28 Mar 2023 09:12:48 +0200 Subject: [PATCH] Implement opcode sendmsg (non-zc) for UdpStream. --- src/io/mod.rs | 2 + src/io/sendmsg.rs | 103 ++++++++++++++++++++++++++++++++++++++++++++++ src/io/socket.rs | 10 +++++ src/net/udp.rs | 20 +++++++++ 4 files changed, 135 insertions(+) create mode 100644 src/io/sendmsg.rs diff --git a/src/io/mod.rs b/src/io/mod.rs index c779c9b8..6985bdd3 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -31,6 +31,8 @@ mod send_to; mod send_zc; +mod sendmsg; + mod sendmsg_zc; mod shared_fd; diff --git a/src/io/sendmsg.rs b/src/io/sendmsg.rs new file mode 100644 index 00000000..385c141d --- /dev/null +++ b/src/io/sendmsg.rs @@ -0,0 +1,103 @@ +use crate::buf::BoundedBuf; +use crate::io::SharedFd; +use crate::runtime::driver::op::{Completable, CqeResult, Op}; +use crate::runtime::CONTEXT; +use socket2::SockAddr; +use std::io; +use std::io::IoSlice; +use std::net::SocketAddr; + +pub(crate) struct SendMsg { + _fd: SharedFd, + _io_bufs: Vec, + _io_slices: Vec>, + _socket_addr: Option>, + msg_control: Option, + msghdr: libc::msghdr, +} + +impl Op> { + pub(crate) fn sendmsg( + fd: &SharedFd, + io_bufs: Vec, + socket_addr: Option, + msg_control: Option, + ) -> io::Result { + use io_uring::{opcode, types}; + + let mut msghdr: libc::msghdr = unsafe { std::mem::zeroed() }; + + let mut io_slices: Vec> = Vec::with_capacity(io_bufs.len()); + + for io_buf in &io_bufs { + io_slices.push(IoSlice::new(unsafe { + std::slice::from_raw_parts(io_buf.stable_ptr(), io_buf.bytes_init()) + })) + } + + msghdr.msg_iov = io_slices.as_ptr() as *mut _; + msghdr.msg_iovlen = io_slices.len() as _; + + let socket_addr = match socket_addr { + Some(_socket_addr) => { + let socket_addr = Box::new(SockAddr::from(_socket_addr)); + msghdr.msg_name = socket_addr.as_ptr() as *mut libc::c_void; + msghdr.msg_namelen = socket_addr.len(); + Some(socket_addr) + } + None => { + msghdr.msg_name = std::ptr::null_mut(); + msghdr.msg_namelen = 0; + None + } + }; + + match msg_control { + Some(ref _msg_control) => { + msghdr.msg_control = _msg_control.stable_ptr() as *mut _; + msghdr.msg_controllen = _msg_control.bytes_init(); + } + None => { + msghdr.msg_control = std::ptr::null_mut(); + msghdr.msg_controllen = 0_usize; + } + } + + CONTEXT.with(|x| { + x.handle().expect("Not in a runtime context").submit_op( + SendMsg { + _fd: fd.clone(), + _io_bufs: io_bufs, + _socket_addr: socket_addr, + _io_slices: io_slices, + msg_control, + msghdr, + }, + |sendmsg| { + opcode::SendMsg::new( + types::Fd(sendmsg._fd.raw_fd()), + &sendmsg.msghdr as *const _, + ) + .build() + }, + ) + }) + } +} + +impl Completable for SendMsg { + type Output = (io::Result, Vec, Option); + + fn complete(self, cqe: CqeResult) -> (io::Result, Vec, Option) { + // Convert the operation result to `usize` + let res = cqe.result.map(|n| n as usize); + + // Recover the data buffers. + let io_bufs = self._io_bufs; + + // Recover the ancillary data buffer. + let msg_control = self.msg_control; + + (res, io_bufs, msg_control) + } +} diff --git a/src/io/socket.rs b/src/io/socket.rs index f35de7bf..dda1bb36 100644 --- a/src/io/socket.rs +++ b/src/io/socket.rs @@ -148,6 +148,16 @@ impl Socket { op.await } + pub(crate) async fn sendmsg( + &self, + io_slices: Vec, + socket_addr: Option, + msg_control: Option, + ) -> (io::Result, Vec, Option) { + let op = Op::sendmsg(&self.fd, io_slices, socket_addr, msg_control).unwrap(); + op.await + } + pub(crate) async fn sendmsg_zc( &self, io_slices: Vec, diff --git a/src/net/udp.rs b/src/net/udp.rs index 9cd8c3e9..cb0cef66 100644 --- a/src/net/udp.rs +++ b/src/net/udp.rs @@ -244,6 +244,26 @@ impl UdpSocket { self.inner.send_zc(buf).await } + /// Sends a message on the socket using a msghdr. + /// + /// Returns a tuple of: + /// + /// * Result containing bytes written on success + /// * The original `io_slices` `Vec` + /// * The original `msg_contol` `Option` + /// + /// Consider using [`Self::sendmsg_zc`] for a zero-copy alternative. + pub async fn sendmsg( + &self, + io_slices: Vec, + socket_addr: Option, + msg_control: Option, + ) -> (io::Result, Vec, Option) { + self.inner + .sendmsg(io_slices, socket_addr, msg_control) + .await + } + /// Sends a message on the socket using a msghdr. /// /// Returns a tuple of: