diff --git a/crates/vsock/src/vhu_vsock_thread.rs b/crates/vsock/src/vhu_vsock_thread.rs index 371b2ff0..a3790468 100644 --- a/crates/vsock/src/vhu_vsock_thread.rs +++ b/crates/vsock/src/vhu_vsock_thread.rs @@ -621,68 +621,75 @@ impl VhostUserVsockThread { None => return Err(Error::NoMemoryConfigured), }; - while let Some(mut avail_desc) = vring - .get_mut() - .get_queue_mut() - .iter(atomic_mem.memory()) - .map_err(|_| Error::IterateQueue)? - .next() - { - used_any = true; - let mem = atomic_mem.clone().memory(); - - let head_idx = avail_desc.head_index(); - let pkt = match VsockPacket::from_tx_virtq_chain( - mem.deref(), - &mut avail_desc, - self.tx_buffer_size, - ) { - Ok(pkt) => pkt, - Err(e) => { - dbg!("vsock: error reading TX packet: {:?}", e); - continue; - } - }; - - if self.thread_backend.send_pkt(&pkt).is_err() { - vring - .get_mut() - .get_queue_mut() - .iter(mem) - .unwrap() - .go_to_previous_position(); - break; - } + let mut vring_mut = vring.get_mut(); + + let queue = vring_mut.get_queue_mut(); - // TODO: Check if the protocol requires read length to be correct - let used_len = 0; + let mut iter_has_elemnt = true; + while iter_has_elemnt { + let queue_iter = queue + .iter(atomic_mem.memory()) + .map_err(|_| Error::IterateQueue)?; - let vring = vring.clone(); - let event_idx = self.event_idx; + iter_has_elemnt = false; + for mut avail_desc in queue_iter { + iter_has_elemnt = true; + used_any = true; + let mem = atomic_mem.clone().memory(); - self.pool.spawn_ok(async move { - if event_idx { - if vring.add_used(head_idx, used_len as u32).is_err() { - warn!("Could not return used descriptors to ring"); + let head_idx = avail_desc.head_index(); + let pkt = match VsockPacket::from_tx_virtq_chain( + mem.deref(), + &mut avail_desc, + self.tx_buffer_size, + ) { + Ok(pkt) => pkt, + Err(e) => { + dbg!("vsock: error reading TX packet: {:?}", e); + continue; } - match vring.needs_notification() { - Err(_) => { - warn!("Could not check if queue needs to be notified"); - vring.signal_used_queue().unwrap(); + }; + + if self.thread_backend.send_pkt(&pkt).is_err() { + vring + .get_mut() + .get_queue_mut() + .iter(mem) + .unwrap() + .go_to_previous_position(); + break; + } + + // TODO: Check if the protocol requires read length to be correct + let used_len = 0; + + let vring = vring.clone(); + let event_idx = self.event_idx; + + self.pool.spawn_ok(async move { + if event_idx { + if vring.add_used(head_idx, used_len as u32).is_err() { + warn!("Could not return used descriptors to ring"); } - Ok(needs_notification) => { - if needs_notification { + match vring.needs_notification() { + Err(_) => { + warn!("Could not check if queue needs to be notified"); vring.signal_used_queue().unwrap(); } + Ok(needs_notification) => { + if needs_notification { + vring.signal_used_queue().unwrap(); + } + } } + } else { + if vring.add_used(head_idx, used_len as u32).is_err() { + warn!("Could not return used descriptors to ring"); + } + vring.signal_used_queue().unwrap(); } - } else { - if vring.add_used(head_idx, used_len as u32).is_err() { - warn!("Could not return used descriptors to ring"); - } - vring.signal_used_queue().unwrap(); - } - }); + }); + } } Ok(used_any)