diff --git a/src/worker/network.rs b/src/worker/network.rs index e1010bb..369ef39 100644 --- a/src/worker/network.rs +++ b/src/worker/network.rs @@ -82,6 +82,159 @@ impl NetworkWorker { Ok(()) } + + fn start_client( + &self, + addr: Ipv4Address, + target_port: u16, + nconnections: u32, + arrival_rate: f64, + departure_rate: f64, + ) -> Result<(), WorkerError> { + let (mut iface, mut device, fd) = self.setup_tap(addr); + let cx = iface.context(); + + // Open static set of connections, that are going to live throughout + // the whole run + let mut sockets = SocketSet::new(vec![]); + + for _i in 0..nconnections { + let tcp_rx_buffer = tcp::SocketBuffer::new(vec![0; 1024]); + let tcp_tx_buffer = tcp::SocketBuffer::new(vec![0; 1024]); + let tcp_socket = tcp::Socket::new(tcp_rx_buffer, tcp_tx_buffer); + + sockets.add(tcp_socket); + } + + for (i, socket) in sockets + .iter_mut() + .filter_map(|(_h, s)| tcp::Socket::downcast_mut(s)) + .enumerate() + { + let index = i; + let (local_addr, local_port) = + self.get_local_addr_port(addr, index); + info!("connecting from {}:{}", local_addr, local_port); + socket + .connect(cx, (addr, target_port), (local_addr, local_port)) + .unwrap(); + } + + // The main loop, where connection state will be updated, and dynamic + // connections will be opened/closed + loop { + let timestamp = Instant::now(); + iface.poll(timestamp, &mut device, &mut sockets); + let cx = iface.context(); + + // Iterate through all sockets, update the state for each one + for (i, (h, s)) in sockets.iter_mut().enumerate() { + let socket = tcp::Socket::downcast_mut(s) + .ok_or(WorkerError::Internal)?; + + trace!("Process socket {}", i); + if socket.can_recv() { + socket + .recv(|data| { + trace!( + "{}", + str::from_utf8(data) + .unwrap_or("(invalid utf8)") + ); + (data.len(), ()) + }) + .unwrap(); + } + + if socket.may_send() { + let response = format!("hello {}\n", i); + let binary = response.as_bytes(); + trace!( + "sending request from idx {} addr {}, data {:?}", + i, + socket.local_endpoint().unwrap().addr, + binary + ); + socket.send_slice(binary).expect("cannot send"); + } + } + + // Ideally we need to wait for iface.poll_delay(timestamp, &sockets) + // interval, but we may stuck without any activity making no + // progress. Since opening new connections and specifying their + // lifetime depends on constant timer check, wait for regular + // intervals. + phy_wait(fd, Some(smoltcp::time::Duration::from_millis(100))) + .expect("wait error"); + } + } + + /// Setup a tap device for communication, wrapped into a Tracer + /// and a FaultInjector. + fn setup_tap( + &self, + addr: Ipv4Address, + ) -> (Interface, FaultInjector>, i32) { + let tap = "tap0"; + let device = TunTapInterface::new(&tap, Medium::Ethernet).unwrap(); + let fd = device.as_raw_fd(); + + let seed = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .subsec_nanos(); + + let device = Tracer::new(device, |_timestamp, _printer| { + trace!("{}", _printer); + }); + + let mut device = FaultInjector::new(device, seed); + + // Create interface + let mut config = match device.capabilities().medium { + Medium::Ethernet => Config::new( + EthernetAddress([0x02, 0x00, 0x00, 0x00, 0x00, 0x01]).into(), + ), + Medium::Ip => Config::new(smoltcp::wire::HardwareAddress::Ip), + Medium::Ieee802154 => todo!(), + }; + config.random_seed = rand::random(); + + let mut iface = Interface::new(config, &mut device, Instant::now()); + iface.set_any_ip(true); + iface.update_ip_addrs(|ip_addrs| { + ip_addrs + .push(IpCidr::new(IpAddress::Ipv4(addr), 16)) + .unwrap(); + }); + + iface.routes_mut().add_default_ipv4_route(addr).unwrap(); + + (iface, device, fd) + } + + /// Map socket index to a local port and address. The address octets are + /// incremented every 100 sockets, whithin this interval the local port + /// is incremented. + fn get_local_addr_port( + &self, + addr: Ipv4Address, + index: usize, + ) -> (IpAddress, u16) { + // 254 (a2 octet) * 254 (a3 octet) * 100 (port) + // gives us maximum 6451600 connections that could be opened + let local_port = 49152 + (index % 100) as u16; + debug!("addr {}, index {}", addr, index); + + let local_addr = IpAddress::v4( + addr.0[0], + addr.0[1], + (((index / 100) + 2) / 255) as u8, + (((index / 100) + 2) % 255) as u8, + ); + + return (local_addr, local_port); + } } impl Worker for NetworkWorker { @@ -104,6 +257,14 @@ impl Worker for NetworkWorker { if server { let _ = self.start_server(ip_addr, target_port); + } else { + let _ = self.start_client( + ip_addr, + target_port, + nconnections, + arrival_rate, + departure_rate, + ); } Ok(())