Skip to content

Commit

Permalink
Add client side of network worker
Browse files Browse the repository at this point in the history
Use smoltcp to open a static set of connections from userspace.
  • Loading branch information
erthalion committed Apr 9, 2024
1 parent 98e47aa commit 7635c31
Showing 1 changed file with 161 additions and 0 deletions.
161 changes: 161 additions & 0 deletions src/worker/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,159 @@ impl NetworkWorker {

Ok(())
}

fn start_client(
&self,
addr: Ipv4Address,
target_port: u16,
nconnections: u32,
arrival_rate: f64,
departure_rate: f64,
) -> Result<(), WorkerError> {
let (mut iface, mut device, fd) = self.setup_tap(addr);
let cx = iface.context();

// Open static set of connections, that are going to live throughout
// the whole run
let mut sockets = SocketSet::new(vec![]);

for _i in 0..nconnections {
let tcp_rx_buffer = tcp::SocketBuffer::new(vec![0; 1024]);
let tcp_tx_buffer = tcp::SocketBuffer::new(vec![0; 1024]);
let tcp_socket = tcp::Socket::new(tcp_rx_buffer, tcp_tx_buffer);

sockets.add(tcp_socket);
}

for (i, socket) in sockets
.iter_mut()
.filter_map(|(_h, s)| tcp::Socket::downcast_mut(s))
.enumerate()
{
let index = i;
let (local_addr, local_port) =
self.get_local_addr_port(addr, index);
info!("connecting from {}:{}", local_addr, local_port);
socket
.connect(cx, (addr, target_port), (local_addr, local_port))
.unwrap();
}

// The main loop, where connection state will be updated, and dynamic
// connections will be opened/closed
loop {
let timestamp = Instant::now();
iface.poll(timestamp, &mut device, &mut sockets);
let cx = iface.context();

// Iterate through all sockets, update the state for each one
for (i, (h, s)) in sockets.iter_mut().enumerate() {
let socket = tcp::Socket::downcast_mut(s)
.ok_or(WorkerError::Internal)?;

trace!("Process socket {}", i);
if socket.can_recv() {
socket
.recv(|data| {
trace!(
"{}",
str::from_utf8(data)
.unwrap_or("(invalid utf8)")
);
(data.len(), ())
})
.unwrap();
}

if socket.may_send() {
let response = format!("hello {}\n", i);
let binary = response.as_bytes();
trace!(
"sending request from idx {} addr {}, data {:?}",
i,
socket.local_endpoint().unwrap().addr,
binary
);
socket.send_slice(binary).expect("cannot send");
}
}

// Ideally we need to wait for iface.poll_delay(timestamp, &sockets)
// interval, but we may stuck without any activity making no
// progress. Since opening new connections and specifying their
// lifetime depends on constant timer check, wait for regular
// intervals.
phy_wait(fd, Some(smoltcp::time::Duration::from_millis(100)))
.expect("wait error");
}
}

/// Setup a tap device for communication, wrapped into a Tracer
/// and a FaultInjector.
fn setup_tap(
&self,
addr: Ipv4Address,
) -> (Interface, FaultInjector<Tracer<TunTapInterface>>, i32) {
let tap = "tap0";
let device = TunTapInterface::new(&tap, Medium::Ethernet).unwrap();
let fd = device.as_raw_fd();

let seed = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.subsec_nanos();

let device = Tracer::new(device, |_timestamp, _printer| {
trace!("{}", _printer);
});

let mut device = FaultInjector::new(device, seed);

// Create interface
let mut config = match device.capabilities().medium {
Medium::Ethernet => Config::new(
EthernetAddress([0x02, 0x00, 0x00, 0x00, 0x00, 0x01]).into(),
),
Medium::Ip => Config::new(smoltcp::wire::HardwareAddress::Ip),
Medium::Ieee802154 => todo!(),
};
config.random_seed = rand::random();

let mut iface = Interface::new(config, &mut device, Instant::now());
iface.set_any_ip(true);
iface.update_ip_addrs(|ip_addrs| {
ip_addrs
.push(IpCidr::new(IpAddress::Ipv4(addr), 16))
.unwrap();
});

iface.routes_mut().add_default_ipv4_route(addr).unwrap();

(iface, device, fd)
}

/// Map socket index to a local port and address. The address octets are
/// incremented every 100 sockets, whithin this interval the local port
/// is incremented.
fn get_local_addr_port(
&self,
addr: Ipv4Address,
index: usize,
) -> (IpAddress, u16) {
// 254 (a2 octet) * 254 (a3 octet) * 100 (port)
// gives us maximum 6451600 connections that could be opened
let local_port = 49152 + (index % 100) as u16;
debug!("addr {}, index {}", addr, index);

let local_addr = IpAddress::v4(
addr.0[0],
addr.0[1],
(((index / 100) + 2) / 255) as u8,
(((index / 100) + 2) % 255) as u8,
);

return (local_addr, local_port);
}
}

impl Worker for NetworkWorker {
Expand All @@ -104,6 +257,14 @@ impl Worker for NetworkWorker {

if server {
let _ = self.start_server(ip_addr, target_port);
} else {
let _ = self.start_client(
ip_addr,
target_port,
nconnections,
arrival_rate,
departure_rate,
);
}

Ok(())
Expand Down

0 comments on commit 7635c31

Please sign in to comment.