diff --git a/Cargo.toml b/Cargo.toml index b7a19792..daa603e2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,6 +35,7 @@ log = { version = "0.4.20", features = [ "max_level_trace", "release_max_level_debug", ] } +nix = { version = "0.28.0", features = ["fs"] } packed_struct = "0.10.1" serde = { version = "1.0.192", features = ["derive"] } serde_yaml = "0.9.27" diff --git a/src/input/composite_device/mod.rs b/src/input/composite_device/mod.rs index 2c4db12f..42f70ac8 100644 --- a/src/input/composite_device/mod.rs +++ b/src/input/composite_device/mod.rs @@ -1,12 +1,14 @@ use std::{ - collections::{HashMap, HashSet}, + borrow::Borrow, + collections::{BTreeSet, HashMap, HashSet}, error::Error, }; +use evdev::InputEvent; use tokio::{ sync::{broadcast, mpsc}, task::JoinSet, - time::{sleep, Duration}, + time::Duration, }; use zbus::{fdo, Connection}; use zbus_macros::dbus_interface; @@ -23,12 +25,15 @@ use crate::{ Event, }, manager::SourceDeviceInfo, + output_event::UinputOutputEvent, source::{self, SourceDevice}, target::TargetCommand, }, udev::{hide_device, unhide_device}, }; +use super::{output_event::OutputEvent, source::SourceCommand}; + /// Size of the command channel buffer for processing input events and commands. const BUFFER_SIZE: usize = 2048; @@ -51,6 +56,7 @@ pub enum InterceptMode { #[derive(Debug, Clone)] pub enum Command { ProcessEvent(String, Event), + ProcessOutputEvent(OutputEvent), GetCapabilities(mpsc::Sender>), SetInterceptMode(InterceptMode), GetInterceptMode(mpsc::Sender), @@ -280,8 +286,11 @@ pub struct CompositeDevice { tx: broadcast::Sender, /// Receiver channel for listening for commands rx: broadcast::Receiver, + /// Map of source device id to their respective transmitter channel. + /// E.g. {"evdev://event0": } + source_devices: HashMap>, /// Source devices that this composite device will consume. - source_devices: Vec, + source_devices_discovered: Vec, /// HashSet of source devices that are blocked from passing their input events to target /// events. source_devices_blocked: HashSet, @@ -297,6 +306,12 @@ pub struct CompositeDevice { /// Map of DBusDevice DBus paths to their respective transmitter channel. /// E.g. {"/org/shadowblip/InputPlumber/devices/target/dbus0": } target_dbus_devices: HashMap>, + /// Set of available Force Feedback effect IDs that are not in use + ff_effect_ids: BTreeSet, + /// Source devices use their own IDs for uploaded force feedback effects. + /// This mapping maps the composite device effect ids to source device effect ids. + /// E.g. {3: {"evdev://event0": 6}} + ff_effect_id_source_map: HashMap>, } impl CompositeDevice { @@ -322,13 +337,16 @@ impl CompositeDevice { intercept_mode: InterceptMode::None, tx, rx, - source_devices: Vec::new(), + source_devices: HashMap::new(), + source_devices_discovered: Vec::new(), source_devices_blocked: HashSet::new(), source_device_paths: Vec::new(), source_device_tasks: JoinSet::new(), source_devices_used: Vec::new(), target_devices: HashMap::new(), target_dbus_devices: HashMap::new(), + ff_effect_ids: (0..64).collect(), + ff_effect_id_source_map: HashMap::new(), }; // Load the capability map if one was defined @@ -390,6 +408,16 @@ impl CompositeDevice { self.run_source_devices().await?; // Keep track of all target devices + for target in targets.values() { + if let Err(e) = target + .send(TargetCommand::SetCompositeDevice(self.tx.clone())) + .await + { + return Err( + format!("Failed to set composite device for target device: {:?}", e).into(), + ); + } + } self.target_devices = targets; // Loop and listen for command events @@ -402,6 +430,11 @@ impl CompositeDevice { log::error!("Failed to process event: {:?}", e); } } + Command::ProcessOutputEvent(event) => { + if let Err(e) = self.process_output_event(event).await { + log::error!("Failed to process output event: {:?}", e); + } + } Command::GetCapabilities(sender) => { if let Err(e) = sender.send(self.capabilities.clone()).await { log::error!("Failed to send capabilities: {:?}", e); @@ -577,13 +610,15 @@ impl CompositeDevice { log::debug!("Starting new source devices"); // Start listening for events from all source devices - let sources = self.source_devices.drain(..); + let sources = self.source_devices_discovered.drain(..); for source in sources { match source { // If the source device is an event device (i.e. from /dev/input/eventXX), // then start listening for inputs from that device. - SourceDevice::EventDevice(device) => { + SourceDevice::EventDevice(mut device) => { let device_id = device.get_id(); + let source_tx = device.transmitter(); + self.source_devices.insert(device_id.clone(), source_tx); let tx = self.tx.clone(); self.source_device_tasks.spawn(async move { if let Err(e) = device.run().await { @@ -600,6 +635,8 @@ impl CompositeDevice { // then start listening for inputs from that device. SourceDevice::HIDRawDevice(device) => { let device_id = device.get_id(); + let source_tx = device.transmitter(); + self.source_devices.insert(device_id.clone(), source_tx); let tx = self.tx.clone(); self.source_device_tasks.spawn(async move { if let Err(e) = device.run().await { @@ -616,6 +653,8 @@ impl CompositeDevice { // then start listening for inputs from that device. SourceDevice::IIODevice(device) => { let device_id = device.get_id(); + let source_tx = device.transmitter(); + self.source_devices.insert(device_id.clone(), source_tx); let tx = self.tx.clone(); self.source_device_tasks.spawn(async move { if let Err(e) = device.run().await { @@ -679,6 +718,150 @@ impl CompositeDevice { Ok(()) } + /// Process a single output event from a target device. + async fn process_output_event(&mut self, event: OutputEvent) -> Result<(), Box> { + log::trace!("Received output event: {:?}", event); + + // Handle any output events that need to upload FF effect data + if let OutputEvent::Uinput(uinput) = event.borrow() { + match uinput { + UinputOutputEvent::FFUpload(data, target_dev) => { + // Upload the effect data to the source devices + let mut source_effect_ids = HashMap::new(); + for (source_id, source) in self.source_devices.iter() { + log::debug!("Uploading effect to {source_id}"); + let (tx, rx) = std::sync::mpsc::channel(); + source.send(SourceCommand::UploadEffect(*data, tx)).await?; + + // Wait for the result of the upload + match rx.recv_timeout(Duration::from_secs(1)) { + Ok(upload_result) => { + if let Err(e) = upload_result { + log::debug!( + "Failed to upload FF effect to {source_id}: {:?}", + e + ); + continue; + } + let source_effect_id = upload_result.unwrap(); + log::debug!("Successfully uploaded effect with source effect id {source_effect_id}"); + source_effect_ids.insert(source_id.clone(), source_effect_id); + } + Err(err) => { + log::error!( + "Failed to receive response from source device {source_id} to upload effect: {:?}", + err + ); + } + } + } + + // If no source devices uploaded the effect, don't bother + // allocating an effect id. + if source_effect_ids.is_empty() { + log::debug!("No source device available to handle FF effect"); + target_dev.send(None)?; + } + + // If upload was successful, return an effect ID + let id = self.ff_effect_ids.iter().next().copied(); + if let Some(id) = id { + log::debug!("Uploaded effect with effect id {id}"); + self.ff_effect_ids.remove(&id); + self.ff_effect_id_source_map.insert(id, source_effect_ids); + target_dev.send(Some(id))?; + } else { + target_dev.send(None)?; + } + } + UinputOutputEvent::FFErase(effect_id) => { + let effect_id = *effect_id as i16; + // Erase the effect from source devices + if let Some(source_effect_ids) = self.ff_effect_id_source_map.get(&effect_id) { + for (source_id, source_effect_id) in source_effect_ids.iter() { + let Some(source) = self.source_devices.get(source_id) else { + continue; + }; + log::debug!("Erasing effect from {source_id}"); + let (tx, rx) = std::sync::mpsc::channel(); + source + .send(SourceCommand::EraseEffect(*source_effect_id, tx)) + .await?; + + // Wait for the result of the erase + match rx.recv_timeout(Duration::from_secs(1)) { + Ok(erase_result) => { + if let Err(e) = erase_result { + log::debug!( + "Failed to erase FF effect from {source_id}: {:?}", + e + ); + continue; + } + } + Err(err) => { + log::error!("Failed to receive response from source device {source_id} to erase effect: {:?}", err); + } + } + } + } + + // Add the effect ID to list of available effect ids + log::debug!("Erased effect with effect id {effect_id}"); + self.ff_effect_ids.insert(effect_id); + self.ff_effect_id_source_map.remove(&effect_id); + } + } + + log::trace!("Available effect IDs: {:?}", self.ff_effect_ids); + log::debug!("Used effect IDs: {:?}", self.ff_effect_id_source_map); + + return Ok(()); + } + + // TODO: Only write the event to devices that are capabile of handling it + for (source_id, source) in self.source_devices.iter() { + // If this is a force feedback event, translate the effect id into + // the source device's effect id. + if let OutputEvent::Evdev(input_event) = event { + if input_event.event_type().0 == evdev::EventType::FORCEFEEDBACK.0 { + // Lookup the source effect ids for the effect + let effect_id = input_event.code() as i16; + let value = input_event.value(); + let Some(source_effect_ids) = self.ff_effect_id_source_map.get(&effect_id) + else { + log::warn!("Received FF event with unknown id: {effect_id}"); + continue; + }; + + // Lookup the source effect id for this source device + let Some(source_effect_id) = source_effect_ids.get(source_id) else { + log::warn!("Unable to find source effect id for effect {effect_id} from {source_id}"); + continue; + }; + + // Create a new FF event with the source device effect id. + let new_event = InputEvent::new_now( + evdev::EventType::FORCEFEEDBACK.0, + *source_effect_id as u16, + value, + ); + let output_event = OutputEvent::Evdev(new_event); + + // Write the FF event to the source device + let event = SourceCommand::WriteEvent(output_event); + source.send(event).await?; + continue; + } + } + + let event = SourceCommand::WriteEvent(event.clone()); + source.send(event.clone()).await?; + } + + Ok(()) + } + /// Translate and write the given event to the appropriate target devices async fn handle_event(&mut self, event: NativeEvent) -> Result<(), Box> { // Check if we need to reverse the event list. @@ -1089,7 +1272,7 @@ impl CompositeDevice { let id = device.get_id(); let device_path = device.get_device_path(); let source_device = source::SourceDevice::EventDevice(device); - self.source_devices.push(source_device); + self.source_devices_discovered.push(source_device); self.source_device_paths.push(device_path); self.source_devices_used.push(id.clone()); @@ -1119,7 +1302,7 @@ impl CompositeDevice { let id = device.get_id(); let device_path = device.get_device_path(); let source_device = source::SourceDevice::HIDRawDevice(device); - self.source_devices.push(source_device); + self.source_devices_discovered.push(source_device); self.source_device_paths.push(device_path); self.source_devices_used.push(id.clone()); @@ -1148,7 +1331,7 @@ impl CompositeDevice { let id = device.get_id(); let device_path = device.get_device_path(); let source_device = source::SourceDevice::IIODevice(device); - self.source_devices.push(source_device); + self.source_devices_discovered.push(source_device); self.source_device_paths.push(device_path); self.source_devices_used.push(id.clone()); diff --git a/src/input/event/mod.rs b/src/input/event/mod.rs index 9d164167..58da8068 100644 --- a/src/input/event/mod.rs +++ b/src/input/event/mod.rs @@ -3,6 +3,7 @@ pub mod evdev; pub mod native; pub mod value; +/// Events are events that flow from source devices to target devices #[derive(Debug, Clone)] pub enum Event { Evdev(evdev::EvdevEvent), diff --git a/src/input/mod.rs b/src/input/mod.rs index ffa32a20..b2e2445e 100644 --- a/src/input/mod.rs +++ b/src/input/mod.rs @@ -3,5 +3,7 @@ pub mod capability; pub mod composite_device; pub mod event; pub mod manager; +pub mod output_capability; +pub mod output_event; pub mod source; pub mod target; diff --git a/src/input/output_capability.rs b/src/input/output_capability.rs new file mode 100644 index 00000000..c5debc07 --- /dev/null +++ b/src/input/output_capability.rs @@ -0,0 +1,16 @@ +/// Output capabilities describe what kind of output events a source input device +/// is capable of handling. E.g. Force Feedback, LED control, etc. +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +pub enum OutputCapability { + ForceFeedback, + #[allow(clippy::upper_case_acronyms)] + LED(LED), +} + +/// LED capability +#[allow(clippy::upper_case_acronyms)] +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +pub enum LED { + Brightness, + Color, +} diff --git a/src/input/output_event/mod.rs b/src/input/output_event/mod.rs new file mode 100644 index 00000000..e1dddbcd --- /dev/null +++ b/src/input/output_event/mod.rs @@ -0,0 +1,19 @@ +use std::sync::mpsc::Sender; + +use ::evdev::{FFEffectData, InputEvent}; + +/// Output events are events that flow from target devices back to source devices +#[derive(Debug, Clone)] +pub enum OutputEvent { + Evdev(InputEvent), + Uinput(UinputOutputEvent), +} + +#[derive(Debug, Clone)] +pub enum UinputOutputEvent { + /// Effect data to upload to a source device and a channel to send back + /// the effect ID. + FFUpload(FFEffectData, Sender>), + /// Effect id to erase + FFErase(u32), +} diff --git a/src/input/source/evdev.rs b/src/input/source/evdev.rs index a8bf55e8..2822b095 100644 --- a/src/input/source/evdev.rs +++ b/src/input/source/evdev.rs @@ -1,7 +1,21 @@ -use std::{collections::HashMap, error::Error}; +use std::{ + any::Any, + collections::HashMap, + error::Error, + os::fd::AsRawFd, + sync::{Arc, Mutex}, + time::Duration, +}; -use evdev::{AbsoluteAxisCode, Device, EventType, InputEvent}; -use tokio::sync::broadcast; +use evdev::{AbsoluteAxisCode, Device, EventType, FFEffect, InputEvent}; +use nix::fcntl::{FcntlArg, OFlag}; +use tokio::{ + sync::{ + broadcast, + mpsc::{self, error::TryRecvError}, + }, + time::sleep, +}; use zbus::{fdo, Connection}; use zbus_macros::dbus_interface; @@ -11,10 +25,18 @@ use crate::{ capability::Capability, composite_device::Command, event::{evdev::EvdevEvent, Event}, + output_event::OutputEvent, }, procfs, }; +use super::SourceCommand; + +/// Size of the [SourceCommand] buffer for receiving output events +const BUFFER_SIZE: usize = 2048; +/// How long to sleep before polling for events. +const POLL_RATE: Duration = Duration::from_micros(1666); + /// The [DBusInterface] provides a DBus interface that can be exposed for managing /// a [Manager]. It works by sending command messages to a channel that the /// [Manager] is listening on. @@ -88,20 +110,41 @@ impl DBusInterface { pub struct EventDevice { info: procfs::device::Device, composite_tx: broadcast::Sender, + tx: mpsc::Sender, + rx: mpsc::Receiver, + ff_effects: HashMap, } impl EventDevice { pub fn new(info: procfs::device::Device, composite_tx: broadcast::Sender) -> Self { - Self { info, composite_tx } + let (tx, rx) = mpsc::channel(BUFFER_SIZE); + Self { + info, + composite_tx, + tx, + rx, + ff_effects: HashMap::new(), + } + } + + /// Returns a transmitter channel that can be used to send events to this device + pub fn transmitter(&self) -> mpsc::Sender { + self.tx.clone() } /// Run the source device handler - pub async fn run(&self) -> Result<(), Box> { + pub async fn run(&mut self) -> Result<(), Box> { let path = self.get_device_path(); log::debug!("Opening device at: {}", path); let mut device = Device::open(path.clone())?; device.grab()?; + // Set the device to do non-blocking reads + // TODO: use epoll to wake up when data is available + // https://github.com/emberian/evdev/blob/main/examples/evtest_nonblocking.rs + let raw_fd = device.as_raw_fd(); + nix::fcntl::fcntl(raw_fd, FcntlArg::F_SETFL(OFlag::O_NONBLOCK))?; + // Query information about the device to get the absolute ranges let mut axes_info = HashMap::new(); for (axis, info) in device.get_absinfo()? { @@ -110,36 +153,130 @@ impl EventDevice { axes_info.insert(axis, info); } - // Read events from the device and send them to the composite device + // Loop to read events from the device and commands over the channel log::debug!("Reading events from {}", path); - let mut events = device.into_event_stream()?; - while let Ok(event) = events.next_event().await { - log::trace!("Received event: {:?}", event); - // If this is an ABS event, get the min/max info for this type of - // event so we can normalize the value. - let abs_info = if event.event_type() == EventType::ABSOLUTE { - axes_info.get(&AbsoluteAxisCode(event.code())) - } else { - None + 'main: loop { + // Read events from the device + let events = { + let result = device.fetch_events(); + match result { + Ok(events) => events.collect(), + Err(err) => match err.kind() { + // Do nothing if this would block + std::io::ErrorKind::WouldBlock => vec![], + _ => { + log::trace!("Failed to fetch events: {:?}", err); + let msg = format!("Failed to fetch events: {:?}", err); + drop(err); + return Err(msg.into()); + } + }, + } }; - // Convert the event into an [EvdevEvent] and optionally include - // the axis information with min/max values - let mut evdev_event: EvdevEvent = event.into(); - if let Some(info) = abs_info { - evdev_event.set_abs_info(*info); + for event in events { + log::trace!("Received event: {:?}", event); + // If this is an ABS event, get the min/max info for this type of + // event so we can normalize the value. + let abs_info = if event.event_type() == EventType::ABSOLUTE { + axes_info.get(&AbsoluteAxisCode(event.code())) + } else { + None + }; + + // Convert the event into an [EvdevEvent] and optionally include + // the axis information with min/max values + let mut evdev_event: EvdevEvent = event.into(); + if let Some(info) = abs_info { + evdev_event.set_abs_info(*info); + } + + // Send the event to the composite device + let event = Event::Evdev(evdev_event); + self.composite_tx + .send(Command::ProcessEvent(self.get_id(), event))?; + } + + // Read commands sent to this device from the channel until it is + // empty. + loop { + match self.rx.try_recv() { + Ok(cmd) => match cmd { + SourceCommand::UploadEffect(data, composite_dev) => { + self.upload_ff_effect(&mut device, data, composite_dev); + } + SourceCommand::EraseEffect(id, composite_dev) => { + self.erase_ff_effect(id, composite_dev); + } + SourceCommand::WriteEvent(event) => { + log::debug!("Received output event: {:?}", event); + if let OutputEvent::Evdev(input_event) = event { + if let Err(e) = device.send_events(&[input_event]) { + log::error!("Failed to write output event: {:?}", e); + break 'main; + } + } + } + SourceCommand::Stop => break 'main, + }, + Err(e) => match e { + TryRecvError::Empty => break, + TryRecvError::Disconnected => { + log::debug!("Receive channel disconnected"); + break 'main; + } + }, + }; } - // Send the event to the composite device - let event = Event::Evdev(evdev_event); - self.composite_tx - .send(Command::ProcessEvent(self.get_id(), event))?; + // Sleep for the polling time + sleep(POLL_RATE).await; } + log::debug!("Stopped reading device events"); Ok(()) } + /// Upload the given effect data to the device and send the result to + /// the composite device. + fn upload_ff_effect( + &mut self, + device: &mut Device, + data: evdev::FFEffectData, + composite_dev: std::sync::mpsc::Sender>>, + ) { + log::debug!("Uploading FF effect data"); + let res = match device.upload_ff_effect(data) { + Ok(effect) => { + let id = effect.id() as i16; + self.ff_effects.insert(id, effect); + composite_dev.send(Ok(id)) + } + Err(e) => { + let err = format!("Failed to upload effect: {:?}", e); + composite_dev.send(Err(err.into())) + } + }; + if let Err(err) = res { + log::error!("Failed to send upload result: {:?}", err); + } + } + + /// Erase the effect from the device with the given effect id and send the + /// result to the composite device. + fn erase_ff_effect( + &mut self, + id: i16, + composite_dev: std::sync::mpsc::Sender>>, + ) { + log::debug!("Erasing FF effect data"); + self.ff_effects.remove(&id); + if let Err(err) = composite_dev.send(Ok(())) { + log::error!("Failed to send erase result: {:?}", err); + } + } + /// Returns a unique identifier for the source device. pub fn get_id(&self) -> String { format!("evdev://{}", self.get_event_handler()) @@ -227,6 +364,14 @@ impl EventDevice { Ok(capabilities) } + + /// Returns the output capabilities (such as Force Feedback) that this source + /// device can fulfill. + pub fn get_output_capabilities(&self) -> Result, Box> { + let capabilities = vec![]; + + Ok(capabilities) + } } /// Returns the DBus object path for evdev devices diff --git a/src/input/source/hidraw.rs b/src/input/source/hidraw.rs index 12414b5b..f97aa9b6 100644 --- a/src/input/source/hidraw.rs +++ b/src/input/source/hidraw.rs @@ -4,7 +4,7 @@ pub mod steam_deck; use std::error::Error; use hidapi::{DeviceInfo, HidApi}; -use tokio::sync::broadcast; +use tokio::sync::{broadcast, mpsc}; use zbus::{fdo, Connection}; use zbus_macros::dbus_interface; @@ -14,6 +14,10 @@ use crate::{ input::{capability::Capability, composite_device::Command}, }; +use super::SourceCommand; +/// Size of the [SourceCommand] buffer for receiving output events +const BUFFER_SIZE: usize = 2048; + /// DBusInterface exposing information about a HIDRaw device pub struct DBusInterface { info: DeviceInfo, @@ -91,11 +95,24 @@ pub fn get_dbus_path(device_path: String) -> String { pub struct HIDRawDevice { info: DeviceInfo, composite_tx: broadcast::Sender, + tx: mpsc::Sender, + rx: mpsc::Receiver, } impl HIDRawDevice { pub fn new(info: DeviceInfo, composite_tx: broadcast::Sender) -> Self { - Self { info, composite_tx } + let (tx, rx) = mpsc::channel(BUFFER_SIZE); + Self { + info, + composite_tx, + tx, + rx, + } + } + + /// Returns a transmitter channel that can be used to send events to this device + pub fn transmitter(&self) -> mpsc::Sender { + self.tx.clone() } /// Run the source device handler. HIDRaw devices require device-specific diff --git a/src/input/source/iio.rs b/src/input/source/iio.rs index 297678aa..7d672bc4 100644 --- a/src/input/source/iio.rs +++ b/src/input/source/iio.rs @@ -3,7 +3,7 @@ pub mod bmi_imu; use std::error::Error; use glob_match::glob_match; -use tokio::sync::broadcast; +use tokio::sync::{broadcast, mpsc}; use zbus::{fdo, Connection}; use zbus_macros::dbus_interface; @@ -13,6 +13,11 @@ use crate::{ input::{capability::Capability, composite_device::Command}, }; +use super::SourceCommand; + +/// Size of the [SourceCommand] buffer for receiving output events +const BUFFER_SIZE: usize = 2048; + /// DBusInterface exposing information about a IIO device pub struct DBusInterface { info: Device, @@ -58,11 +63,24 @@ pub fn get_dbus_path(id: String) -> String { pub struct IIODevice { info: Device, composite_tx: broadcast::Sender, + tx: mpsc::Sender, + rx: mpsc::Receiver, } impl IIODevice { pub fn new(info: Device, composite_tx: broadcast::Sender) -> Self { - Self { info, composite_tx } + let (tx, rx) = mpsc::channel(BUFFER_SIZE); + Self { + info, + composite_tx, + tx, + rx, + } + } + + /// Returns a transmitter channel that can be used to send events to this device + pub fn transmitter(&self) -> mpsc::Sender { + self.tx.clone() } pub fn get_capabilities(&self) -> Result, Box> { diff --git a/src/input/source/mod.rs b/src/input/source/mod.rs index 80ea473d..5b612157 100644 --- a/src/input/source/mod.rs +++ b/src/input/source/mod.rs @@ -1,3 +1,9 @@ +use std::{error::Error, sync::mpsc::Sender}; + +use ::evdev::FFEffectData; + +use super::output_event::OutputEvent; + pub mod evdev; pub mod hidraw; pub mod iio; @@ -9,3 +15,16 @@ pub enum SourceDevice { HIDRawDevice(hidraw::HIDRawDevice), IIODevice(iio::IIODevice), } + +/// A [SourceCommand] is a message that can be sent to a [SourceDevice] over +/// a channel. +#[derive(Debug, Clone)] +pub enum SourceCommand { + WriteEvent(OutputEvent), + UploadEffect( + FFEffectData, + Sender>>, + ), + EraseEffect(i16, Sender>>), + Stop, +} diff --git a/src/input/target/dbus.rs b/src/input/target/dbus.rs index e7bfc92d..97717f72 100644 --- a/src/input/target/dbus.rs +++ b/src/input/target/dbus.rs @@ -61,7 +61,7 @@ pub struct DBusDevice { dbus_path: Option, tx: mpsc::Sender, rx: mpsc::Receiver, - _composite_tx: Option>, + composite_tx: Option>, } impl DBusDevice { @@ -72,7 +72,7 @@ impl DBusDevice { state: State::default(), conn, dbus_path: None, - _composite_tx: None, + composite_tx: None, tx, rx, } @@ -88,6 +88,12 @@ impl DBusDevice { self.tx.clone() } + /// Configures the device to send output events to the given composite device + /// channel. + pub fn set_composite_device(&mut self, tx: broadcast::Sender) { + self.composite_tx = Some(tx); + } + /// Creates a new instance of the dbus device interface on DBus. pub async fn listen_on_dbus(&mut self, path: String) -> Result<(), Box> { let conn = self.conn.clone(); @@ -109,6 +115,9 @@ impl DBusDevice { log::debug!("Started listening for events to send"); while let Some(command) = self.rx.recv().await { match command { + TargetCommand::SetCompositeDevice(tx) => { + self.set_composite_device(tx); + } TargetCommand::WriteEvent(event) => { //log::debug!("Got event to emit: {:?}", event); let dbus_events = self.translate_event(event); diff --git a/src/input/target/gamepad.rs b/src/input/target/gamepad.rs index 607095a9..771f3150 100644 --- a/src/input/target/gamepad.rs +++ b/src/input/target/gamepad.rs @@ -1,25 +1,43 @@ //! The GenericGamepad target provides a simple generic virtual gamepad based //! on the XBox 360 gamepad. -use std::{collections::HashMap, error::Error}; +use std::{ + collections::HashMap, + error::Error, + ops::DerefMut, + os::fd::AsRawFd, + sync::{Arc, Mutex}, + thread, +}; use evdev::{ uinput::{VirtualDevice, VirtualDeviceBuilder}, - AbsInfo, AbsoluteAxisCode, AttributeSet, InputEvent, KeyCode, SynchronizationCode, - SynchronizationEvent, UinputAbsSetup, + AbsInfo, AbsoluteAxisCode, AttributeSet, EventSummary, FFEffectCode, FFStatusCode, InputEvent, + KeyCode, SynchronizationCode, SynchronizationEvent, UInputCode, UinputAbsSetup, +}; +use nix::fcntl::{FcntlArg, OFlag}; +use tokio::{ + sync::{ + broadcast, + mpsc::{self, error::TryRecvError}, + }, + time::Duration, }; -use tokio::sync::{broadcast, mpsc}; use zbus::{fdo, Connection}; use zbus_macros::dbus_interface; use crate::input::{ capability::Capability, - composite_device, + composite_device::Command, event::{evdev::EvdevEvent, native::NativeEvent}, + output_event::{OutputEvent, UinputOutputEvent}, }; use super::TargetCommand; +/// Size of the [TargetCommand] buffer for receiving input events const BUFFER_SIZE: usize = 2048; +/// How long to sleep before polling for events. +const POLL_RATE: Duration = Duration::from_micros(1666); /// The [DBusInterface] provides a DBus interface that can be exposed for managing /// a [GenericGamepad]. @@ -46,7 +64,7 @@ pub struct GenericGamepad { dbus_path: Option, tx: mpsc::Sender, rx: mpsc::Receiver, - _composite_tx: Option>, + composite_tx: Option>, } impl GenericGamepad { @@ -57,7 +75,7 @@ impl GenericGamepad { dbus_path: None, tx, rx, - _composite_tx: None, + composite_tx: None, } } @@ -82,6 +100,12 @@ impl GenericGamepad { self.tx.clone() } + /// Configures the device to send output events to the given composite device + /// channel. + pub fn set_composite_device(&mut self, tx: broadcast::Sender) { + self.composite_tx = Some(tx); + } + /// Creates a new instance of the dbus device interface on DBus. pub async fn listen_on_dbus(&mut self, path: String) -> Result<(), Box> { let conn = self.conn.clone(); @@ -98,30 +122,38 @@ impl GenericGamepad { /// Creates and runs the target device pub async fn run(&mut self) -> Result<(), Box> { log::debug!("Creating virtual gamepad"); - let mut device = self.create_virtual_device()?; + let device = self.create_virtual_device()?; + + // Put the device behind an Arc Mutex so it can be shared between the + // read and write threads + let device = Arc::new(Mutex::new(device)); // Query information about the device to get the absolute ranges let axes_map = self.get_abs_info(); - // TODO: Listen for events (Force Feedback Events) - //let event_stream = device.into_event_stream()?; - - // Listen for send events - log::debug!("Started listening for events to send"); + // Listen for events from source devices + log::debug!("Started listening for events"); while let Some(command) = self.rx.recv().await { match command { + TargetCommand::SetCompositeDevice(tx) => { + self.set_composite_device(tx.clone()); + + // Spawn a thread to listen for force feedback events + let ff_device = device.clone(); + GenericGamepad::spawn_ff_thread(ff_device, tx); + } TargetCommand::WriteEvent(event) => { - log::trace!("Got event to emit: {:?}", event); + log::debug!("Got event to emit: {:?}", event); let evdev_events = self.translate_event(event, axes_map.clone()); - device.emit(evdev_events.as_slice())?; - device.emit(&[SynchronizationEvent::new( - SynchronizationCode::SYN_REPORT, - 0, - ) - .into()])?; + if let Ok(mut dev) = device.lock() { + dev.emit(evdev_events.as_slice())?; + dev.emit(&[ + SynchronizationEvent::new(SynchronizationCode::SYN_REPORT, 0).into(), + ])?; + } } TargetCommand::Stop => break, - }; + } } log::debug!("Stopping device"); @@ -206,13 +238,13 @@ impl GenericGamepad { let abs_hat0y = UinputAbsSetup::new(AbsoluteAxisCode::ABS_HAT0Y, dpad_setup); // Setup Force Feedback - //let mut ff = AttributeSet::::new(); - //ff.insert(FFEffectCode::FF_RUMBLE); - //ff.insert(FFEffectCode::FF_PERIODIC); - //ff.insert(FFEffectCode::FF_SQUARE); - //ff.insert(FFEffectCode::FF_TRIANGLE); - //ff.insert(FFEffectCode::FF_SINE); - //ff.insert(FFEffectCode::FF_GAIN); + let mut ff = AttributeSet::::new(); + ff.insert(FFEffectCode::FF_RUMBLE); + ff.insert(FFEffectCode::FF_PERIODIC); + ff.insert(FFEffectCode::FF_SQUARE); + ff.insert(FFEffectCode::FF_TRIANGLE); + ff.insert(FFEffectCode::FF_SINE); + ff.insert(FFEffectCode::FF_GAIN); // Build the device let device = VirtualDeviceBuilder::new()? @@ -226,9 +258,133 @@ impl GenericGamepad { .with_absolute_axis(&abs_rz)? .with_absolute_axis(&abs_hat0x)? .with_absolute_axis(&abs_hat0y)? - //.with_ff(&ff)? + .with_ff(&ff)? + .with_ff_effects_max(16) .build()?; + // Set the device to do non-blocking reads + // TODO: use epoll to wake up when data is available + // https://github.com/emberian/evdev/blob/main/examples/evtest_nonblocking.rs + let raw_fd = device.as_raw_fd(); + nix::fcntl::fcntl(raw_fd, FcntlArg::F_SETFL(OFlag::O_NONBLOCK))?; + Ok(device) } + + /// Spawns the force-feedback handler thread + fn spawn_ff_thread(ff_device: Arc>, tx: broadcast::Sender) { + tokio::task::spawn_blocking(move || { + loop { + // Read any events + if let Err(e) = GenericGamepad::process_ff(&ff_device, &tx) { + log::warn!("Error processing FF events: {:?}", e); + } + + // Sleep for the poll rate interval + thread::sleep(POLL_RATE); + } + }); + } + + /// Process force feedback events from the given device + fn process_ff( + device: &Arc>, + composite_dev: &broadcast::Sender, + ) -> Result<(), Box> { + // Listen for events (Force Feedback Events) + let events = match device.lock() { + Ok(mut dev) => { + let res = dev.deref_mut().fetch_events(); + match res { + Ok(events) => events.collect(), + Err(err) => match err.kind() { + // Do nothing if this would block + std::io::ErrorKind::WouldBlock => vec![], + _ => { + log::trace!("Failed to fetch events: {:?}", err); + return Err(err.into()); + } + }, + } + } + Err(err) => { + log::trace!("Failed to lock device mutex: {:?}", err); + return Err(err.to_string().into()); + } + }; + + const STOPPED: i32 = FFStatusCode::FF_STATUS_STOPPED.0 as i32; + const PLAYING: i32 = FFStatusCode::FF_STATUS_PLAYING.0 as i32; + + // Process the events + for event in events { + match event.destructure() { + EventSummary::UInput(event, UInputCode::UI_FF_UPLOAD, ..) => { + log::debug!("Got FF upload event"); + // Claim ownership of the FF upload and convert it to a FF_UPLOAD + // event + let mut event = device + .lock() + .map_err(|e| e.to_string())? + .process_ff_upload(event)?; + + log::debug!("Upload effect: {:?}", event.effect()); + + // Send the effect data to be uploaded to the device and wait + // for an effect ID to be generated. + let (tx, rx) = std::sync::mpsc::channel::>(); + let upload = + OutputEvent::Uinput(UinputOutputEvent::FFUpload(event.effect(), tx)); + if let Err(e) = composite_dev.send(Command::ProcessOutputEvent(upload)) { + event.set_retval(-1); + return Err(e.into()); + } + let effect_id = match rx.recv_timeout(Duration::from_secs(1)) { + Ok(id) => id, + Err(e) => { + event.set_retval(-1); + return Err(e.into()); + } + }; + + // Set the effect ID for the FF effect + if let Some(id) = effect_id { + event.set_effect_id(id); + event.set_retval(0); + } else { + log::warn!("Failed to get effect ID to upload FF effect"); + event.set_retval(-1); + } + } + EventSummary::UInput(event, UInputCode::UI_FF_ERASE, ..) => { + log::debug!("Got FF erase event"); + // Claim ownership of the FF erase event and convert it to a FF_ERASE + // event. + let event = device + .lock() + .map_err(|e| e.to_string())? + .process_ff_erase(event)?; + log::debug!("Erase effect: {:?}", event.effect_id()); + + let erase = OutputEvent::Uinput(UinputOutputEvent::FFErase(event.effect_id())); + composite_dev.send(Command::ProcessOutputEvent(erase))?; + } + EventSummary::ForceFeedback(.., effect_id, STOPPED) => { + log::debug!("Stopped effect ID: {}", effect_id.0); + log::debug!("Stopping event: {:?}", event); + composite_dev.send(Command::ProcessOutputEvent(OutputEvent::Evdev(event)))?; + } + EventSummary::ForceFeedback(.., effect_id, PLAYING) => { + log::debug!("Playing effect ID: {}", effect_id.0); + log::debug!("Playing event: {:?}", event); + composite_dev.send(Command::ProcessOutputEvent(OutputEvent::Evdev(event)))?; + } + _ => { + log::debug!("Unhandled event: {:?}", event); + } + } + } + + Ok(()) + } } diff --git a/src/input/target/keyboard.rs b/src/input/target/keyboard.rs index 59c110e3..9e845a9c 100644 --- a/src/input/target/keyboard.rs +++ b/src/input/target/keyboard.rs @@ -66,7 +66,7 @@ pub struct KeyboardDevice { dbus_path: Option, tx: mpsc::Sender, rx: mpsc::Receiver, - _composite_tx: Option>, + composite_tx: Option>, } impl KeyboardDevice { @@ -75,7 +75,7 @@ impl KeyboardDevice { Self { conn, dbus_path: None, - _composite_tx: None, + composite_tx: None, tx, rx, } @@ -91,6 +91,12 @@ impl KeyboardDevice { self.tx.clone() } + /// Configures the device to send output events to the given composite device + /// channel. + pub fn set_composite_device(&mut self, tx: broadcast::Sender) { + self.composite_tx = Some(tx); + } + /// Creates a new instance of the device interface on DBus. pub async fn listen_on_dbus(&mut self, path: String) -> Result<(), Box> { let conn = self.conn.clone(); @@ -115,6 +121,9 @@ impl KeyboardDevice { log::debug!("Started listening for events to send"); while let Some(command) = self.rx.recv().await { match command { + TargetCommand::SetCompositeDevice(tx) => { + self.set_composite_device(tx); + } TargetCommand::WriteEvent(event) => { //log::debug!("Got event to emit: {:?}", event); let evdev_events = self.translate_event(event, axis_map.clone()); diff --git a/src/input/target/mod.rs b/src/input/target/mod.rs index 6936d39a..b6d6a9e7 100644 --- a/src/input/target/mod.rs +++ b/src/input/target/mod.rs @@ -1,4 +1,6 @@ -use super::event::native::NativeEvent; +use tokio::sync::broadcast::Sender; + +use super::{composite_device::Command, event::native::NativeEvent}; pub mod dbus; pub mod gamepad; @@ -24,5 +26,6 @@ pub enum TargetDeviceType { #[derive(Debug, Clone)] pub enum TargetCommand { WriteEvent(NativeEvent), + SetCompositeDevice(Sender), Stop, } diff --git a/src/input/target/mouse.rs b/src/input/target/mouse.rs index 5320b801..8a2b5fd4 100644 --- a/src/input/target/mouse.rs +++ b/src/input/target/mouse.rs @@ -56,7 +56,7 @@ pub struct MouseDevice { dbus_path: Option, tx: mpsc::Sender, rx: mpsc::Receiver, - _composite_tx: Option>, + composite_tx: Option>, } impl MouseDevice { @@ -65,7 +65,7 @@ impl MouseDevice { Self { conn, dbus_path: None, - _composite_tx: None, + composite_tx: None, tx, rx, } @@ -81,6 +81,12 @@ impl MouseDevice { self.tx.clone() } + /// Configures the device to send output events to the given composite device + /// channel. + pub fn set_composite_device(&mut self, tx: broadcast::Sender) { + self.composite_tx = Some(tx); + } + /// Creates a new instance of the device interface on DBus. pub async fn listen_on_dbus(&mut self, path: String) -> Result<(), Box> { let conn = self.conn.clone(); @@ -127,6 +133,9 @@ impl MouseDevice { log::debug!("Started listening for events to send"); while let Some(command) = self.rx.recv().await { match command { + TargetCommand::SetCompositeDevice(tx) => { + self.set_composite_device(tx); + } TargetCommand::WriteEvent(event) => { log::trace!("Got event to emit: {:?}", event); diff --git a/src/input/target/steam_deck.rs b/src/input/target/steam_deck.rs index 369eabbe..ac8d68b7 100644 --- a/src/input/target/steam_deck.rs +++ b/src/input/target/steam_deck.rs @@ -56,7 +56,7 @@ pub struct SteamDeckDevice { tx: mpsc::Sender, rx: mpsc::Receiver, state: PackedInputDataReport, - _composite_tx: Option>, + composite_tx: Option>, } impl SteamDeckDevice { @@ -68,7 +68,7 @@ impl SteamDeckDevice { tx, rx, state: PackedInputDataReport::new(), - _composite_tx: None, + composite_tx: None, } } @@ -77,6 +77,12 @@ impl SteamDeckDevice { self.tx.clone() } + /// Configures the device to send output events to the given composite device + /// channel. + pub fn set_composite_device(&mut self, tx: broadcast::Sender) { + self.composite_tx = Some(tx); + } + /// Creates a new instance of the dbus device interface on DBus. pub async fn listen_on_dbus(&mut self, path: String) -> Result<(), Box> { let conn = self.conn.clone(); @@ -196,6 +202,9 @@ impl SteamDeckDevice { log::debug!("Started listening for events to send"); while let Some(command) = self.rx.recv().await { match command { + TargetCommand::SetCompositeDevice(tx) => { + self.set_composite_device(tx); + } TargetCommand::WriteEvent(event) => { // Update internal state self.update_state(event);