diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 85fb2219..9598fd11 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -51,16 +51,6 @@ jobs: manifest-dir: ${{ github.workspace }}/.github/manifest github-binarycache: true - - name: Cargo Build Windows - if: matrix.os == 'windows-latest' - run: |- - $env:SODIUM_LIB_DIR="$(pwd)\vcpkg\packages\libsodium_x64-windows-release\lib" - cargo build --all-targets - - - name: Cargo Build - if: matrix.os != 'windows-latest' - run: cargo build --all-targets - - name: Cargo Build - musl if: matrix.os == 'ubuntu-latest' run: | @@ -70,15 +60,15 @@ jobs: - name: Cargo Test Windows if: matrix.os == 'windows-latest' env: - RUST_LOG: error + RUST_LOG: info RUST_BACKTRACE: 1 run: |- $env:SODIUM_LIB_DIR="$(pwd)\vcpkg\packages\libsodium_x64-windows-release\lib" - cargo test -- --nocapture + make unit - name: Cargo Test if: matrix.os != 'windows-latest' env: - RUST_LOG: error + RUST_LOG: info RUST_BACKTRACE: 1 - run: cargo test -- --nocapture + run: make unit diff --git a/Cargo.lock b/Cargo.lock index b6fdbafa..1c8c2809 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -196,6 +196,29 @@ version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" +[[package]] +name = "bindgen" +version = "0.69.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "271383c67ccabffb7381723dea0672a673f292304fcb45c01cc648c7a8d58088" +dependencies = [ + "bitflags 2.5.0", + "cexpr", + "clang-sys", + "itertools", + "lazy_static", + "lazycell", + "log", + "prettyplease", + "proc-macro2", + "quote", + "regex", + "rustc-hash", + "shlex", + "syn 2.0.60", + "which", +] + [[package]] name = "bit_field" version = "0.10.2" @@ -285,6 +308,15 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6d43a04d8753f35258c91f8ec639f792891f748a1edbd759cf1dcea3382ad83c" +[[package]] +name = "cexpr" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6fac387a98bb7c37292057cffc56d62ecb629900026402633ae9160df93a8766" +dependencies = [ + "nom", +] + [[package]] name = "cfg-if" version = "1.0.0" @@ -338,6 +370,17 @@ dependencies = [ "inout", ] +[[package]] +name = "clang-sys" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b023947811758c97c59bf9d1c188fd619ad4718dcaa767947df1cadb14f39f4" +dependencies = [ + "glob", + "libc", + "libloading", +] + [[package]] name = "clap" version = "4.5.4" @@ -379,6 +422,15 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "98cc8fbded0c607b7ba9dd60cd98df59af97e84d24e49c8557331cfc26d301ce" +[[package]] +name = "cmake" +version = "0.1.51" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb1e43aa7fd152b1f968787f7dbcdeb306d1867ff373c69955211876c053f91a" +dependencies = [ + "cc", +] + [[package]] name = "colorchoice" version = "1.0.0" @@ -426,6 +478,32 @@ dependencies = [ "memchr", ] +[[package]] +name = "cpp_build" +version = "0.5.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e361fae2caf9758164b24da3eedd7f7d7451be30d90d8e7b5d2be29a2f0cf5b" +dependencies = [ + "cc", + "cpp_common", + "lazy_static", + "proc-macro2", + "regex", + "syn 2.0.60", + "unicode-xid", +] + +[[package]] +name = "cpp_common" +version = "0.5.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3e1a2532e4ed4ea13031c13bc7bc0dbca4aae32df48e9d77f0d1e743179f2ea1" +dependencies = [ + "lazy_static", + "proc-macro2", + "syn 2.0.60", +] + [[package]] name = "cpufeatures" version = "0.2.12" @@ -571,6 +649,33 @@ version = "2.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e8566979429cf69b49a5c740c60791108e86440e8be149bbea4fe54d2c32d6e2" +[[package]] +name = "datachannel" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "065c0a2f521db946447fcea36f1e6a2634633568e4f32dc1384941e455763471" +dependencies = [ + "datachannel-sys", + "derivative", + "parking_lot", + "serde", + "tracing", + "webrtc-sdp", +] + +[[package]] +name = "datachannel-sys" +version = "0.21.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a1b2f94958fd5ba1ba19d88db9757906ba7849c34cc6a1e535e91c3c8cad831" +dependencies = [ + "bindgen", + "cmake", + "cpp_build", + "once_cell", + "openssl-src", +] + [[package]] name = "deranged" version = "0.3.11" @@ -580,6 +685,17 @@ dependencies = [ "powerfmt", ] +[[package]] +name = "derivative" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fcc3dd5e9e9c0b295d6e1e4d811fb6f157d5ffd784b8d202fc62eac8035a770b" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "derive_arbitrary" version = "1.3.2" @@ -874,6 +990,12 @@ version = "0.28.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4271d37baee1b8c7e4b708028c57d816cf9d2434acb33a549475f78c181f6253" +[[package]] +name = "glob" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" + [[package]] name = "h2" version = "0.3.26" @@ -963,6 +1085,15 @@ dependencies = [ "digest", ] +[[package]] +name = "home" +version = "0.5.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3d1354bf6b7235cb4a0576c2619fd4ed18183f689b12b006a0ee7329eeff9a5" +dependencies = [ + "windows-sys 0.52.0", +] + [[package]] name = "hostname" version = "0.3.1" @@ -1323,6 +1454,12 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" +[[package]] +name = "lazycell" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" + [[package]] name = "libc" version = "0.2.154" @@ -1461,6 +1598,12 @@ version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" +[[package]] +name = "minimal-lexical" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" + [[package]] name = "minisign-verify" version = "0.2.1" @@ -1511,6 +1654,16 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "27b02d87554356db9e9a873add8782d4ea6e3e58ea071a9adb9a2e8ddb884a8b" +[[package]] +name = "nom" +version = "7.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a" +dependencies = [ + "memchr", + "minimal-lexical", +] + [[package]] name = "nu-ansi-term" version = "0.46.0" @@ -1788,6 +1941,16 @@ version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" +[[package]] +name = "prettyplease" +version = "0.2.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f12335488a2f3b0a83b14edad48dca9879ce89b2edd10e80237e4e852dd645e" +dependencies = [ + "proc-macro2", + "syn 2.0.60", +] + [[package]] name = "proc-macro-error" version = "1.0.4" @@ -2042,6 +2205,12 @@ version = "0.1.23" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" +[[package]] +name = "rustc-hash" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" + [[package]] name = "rustc_version" version = "0.4.0" @@ -2345,6 +2514,12 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "shlex" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" + [[package]] name = "signal-hook-registry" version = "1.4.2" @@ -2847,9 +3022,12 @@ name = "tx5-connection" version = "0.1.4-beta" dependencies = [ "bit_field", + "datachannel", "futures", "rand", "sbd-server", + "serde", + "serde_json", "tokio", "tracing", "tracing-subscriber", @@ -2998,6 +3176,12 @@ dependencies = [ "tinyvec", ] +[[package]] +name = "unicode-xid" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ebc1c04c71510c7f702b52b7c350734c9ff1295c464a03335b00bb84fc54f853" + [[package]] name = "untrusted" version = "0.9.0" @@ -3194,6 +3378,28 @@ dependencies = [ "rustls-pki-types", ] +[[package]] +name = "webrtc-sdp" +version = "0.3.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a87d58624aae43577604ea137de9dcaf92793eccc4d816efad482001c2e055ca" +dependencies = [ + "log", + "url", +] + +[[package]] +name = "which" +version = "4.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87ba24419a2078cd2b0f2ede2691b6c66d8e47836da3b6db8265ebad47afbfc7" +dependencies = [ + "either", + "home", + "once_cell", + "rustix", +] + [[package]] name = "widestring" version = "1.1.0" diff --git a/Cargo.toml b/Cargo.toml index 1f78cd40..8d22a56b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,6 +21,7 @@ bit_field = "0.10.2" bytes = "1.4.0" clap = { version = "4.4.6", features = [ "derive", "wrap_help" ] } criterion = { version = "0.5.1", features = [ "async_tokio" ] } +datachannel = { version = "0.13.1", default-features = false, features = [ "tracing", "vendored" ] } dirs = "5.0.0" dunce = "1.0.3" futures = "0.3.28" diff --git a/Makefile b/Makefile index 580917af..6734163f 100644 --- a/Makefile +++ b/Makefile @@ -77,13 +77,17 @@ test: static unit unit: cargo build --all-targets - RUST_BACKTRACE=1 RUST_LOG=error cargo test -- --nocapture + RUST_BACKTRACE=1 RUST_LOG=info cargo test -- --nocapture + #--TODO--Once libdatachannel is the default, we'll want to keep + # go-pion tested for a while until we're ready to deprecate it + #RUST_BACKTRACE=1 RUST_LOG=info cargo test --no-default-features --features backend-go-pion --manifest-path crates/tx5-connection/Cargo.toml -- --nocapture + #RUST_BACKTRACE=1 RUST_LOG=info cargo test --no-default-features --features backend-go-pion --manifest-path crates/tx5/Cargo.toml -- --nocapture static: dep fmt lint docs @if [ "${CI}x" != "x" ]; then git diff --exit-code; fi lint: - cargo clippy -- -Dwarnings + cargo clippy --features backend-go-pion,backend-libdatachannel -- -Dwarnings dep: @#uhhh... better way to do this? depend on cargo-tree? diff --git a/android-build-tests.bash b/android-build-tests.bash index 4a71ed43..50b81f40 100755 --- a/android-build-tests.bash +++ b/android-build-tests.bash @@ -41,7 +41,7 @@ export TARGET_AR="${_ndk_root}/bin/llvm-ar" export TARGET_RANLIB="${_ndk_root}/bin/llvm-ranlib" export CGO_CFLAGS="-I${_ndk_root}/sysroot/usr/include -I${_ndk_root}/sysroot/usr/include/${ANDROID_ARCH}-linux-android" -cargo test --no-run --target ${ANDROID_ARCH}-linux-android --config target.${ANDROID_ARCH}-linux-android.linker="\"${_ndk_root}/bin/${ANDROID_ARCH}-linux-android34-clang\"" --config target.${ANDROID_ARCH}-linux-android.ar="\"${_ndk_root}/bin/llvm-ar\"" 2>&1 | tee output-cargo-test +cargo test --manifest-path crates/tx5/Cargo.toml --no-default-features --features backend-go-pion --no-run --target ${ANDROID_ARCH}-linux-android --config target.${ANDROID_ARCH}-linux-android.linker="\"${_ndk_root}/bin/${ANDROID_ARCH}-linux-android34-clang\"" --config target.${ANDROID_ARCH}-linux-android.ar="\"${_ndk_root}/bin/llvm-ar\"" 2>&1 | tee output-cargo-test cat output-cargo-test | grep Executable | sed -E 's/[^(]*\(([^)]*)\)/\1/' > output-test-executables echo "BUILD TESTS:" cat output-test-executables diff --git a/crates/tx5-connection/Cargo.toml b/crates/tx5-connection/Cargo.toml index acf05fb4..f72250cf 100644 --- a/crates/tx5-connection/Cargo.toml +++ b/crates/tx5-connection/Cargo.toml @@ -13,6 +13,9 @@ edition = "2021" [features] default = [ "backend-go-pion" ] +# use the libdatachannel crate as the webrtc backend +backend-libdatachannel = [ "dep:datachannel" ] + # use the tx5-go-pion crate as the webrtc backend backend-go-pion = [ "dep:tx5-go-pion" ] @@ -21,7 +24,10 @@ backend-webrtc-rs = [ ] [dependencies] bit_field = { workspace = true } +datachannel = { workspace = true, optional = true } futures = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } tokio = { workspace = true, features = [ "full" ] } tracing = { workspace = true } tx5-core = { workspace = true } diff --git a/crates/tx5-connection/src/config.rs b/crates/tx5-connection/src/config.rs new file mode 100644 index 00000000..c2784ebf --- /dev/null +++ b/crates/tx5-connection/src/config.rs @@ -0,0 +1,33 @@ +use std::sync::Arc; + +/// The backend webrtc module to use. +#[derive(Debug, Clone, Copy)] +pub enum BackendModule { + /// Use the libdatachannel backend. + #[cfg(feature = "backend-libdatachannel")] + LibDataChannel, + + /// Use the go pion backend. + #[cfg(feature = "backend-go-pion")] + GoPion, +} + +impl Default for BackendModule { + #[allow(unreachable_code)] + fn default() -> Self { + #[cfg(feature = "backend-libdatachannel")] + return Self::LibDataChannel; + #[cfg(feature = "backend-go-pion")] + Self::GoPion + } +} + +/// Tx5 connection hub config. +#[derive(Default)] +pub struct HubConfig { + /// The backend webrtc module to use. + pub backend_module: BackendModule, + + /// The signal config to use. + pub signal_config: Arc, +} diff --git a/crates/tx5-connection/src/conn.rs b/crates/tx5-connection/src/conn.rs index 81ca31c3..a0550f8a 100644 --- a/crates/tx5-connection/src/conn.rs +++ b/crates/tx5-connection/src/conn.rs @@ -76,7 +76,7 @@ impl Conn { is_polite: bool, pub_key: PubKey, client: Weak, - config: Arc, + config: Arc, hub_cmd_send: tokio::sync::mpsc::Sender, ) -> (Arc, ConnRecv, CloseSend) { netaudit!( @@ -99,7 +99,7 @@ impl Conn { let (mut msg_send, msg_recv) = CloseSend::channel(); let (cmd_send, mut cmd_recv) = CloseSend::channel(); - let keepalive_dur = config.max_idle / 2; + let keepalive_dur = config.signal_config.max_idle / 2; let client2 = client.clone(); let pub_key2 = pub_key.clone(); let keepalive_task = tokio::task::spawn(async move { @@ -188,7 +188,12 @@ impl Conn { Result::Ok(()) }; - match tokio::time::timeout(config.max_idle, handshake_fut).await { + match tokio::time::timeout( + config.signal_config.max_idle, + handshake_fut, + ) + .await + { Err(_) | Ok(Err(_)) => { client.close_peer(&pub_key2).await; return; @@ -201,7 +206,8 @@ impl Conn { // closing the semaphore causes all the acquire awaits to end ready2.close(); - let (webrtc, mut webrtc_recv) = webrtc::Webrtc::new( + let (webrtc, mut webrtc_recv) = webrtc::new_backend_module( + config.backend_module, is_polite, webrtc_config, // MAYBE - make this configurable @@ -324,9 +330,11 @@ impl Conn { let mut send_over_webrtc = false; loop { - let cmd = - tokio::time::timeout(config.max_idle, cmd_recv.recv()) - .await; + let cmd = tokio::time::timeout( + config.signal_config.max_idle, + cmd_recv.recv(), + ) + .await; let cmd = match cmd { Err(_) => { diff --git a/crates/tx5-connection/src/hub.rs b/crates/tx5-connection/src/hub.rs index 43336db4..fa8294f9 100644 --- a/crates/tx5-connection/src/hub.rs +++ b/crates/tx5-connection/src/hub.rs @@ -29,7 +29,7 @@ async fn hub_map_assert( pub_key: PubKey, map: &mut HubMap, client: &Arc, - config: &Arc, + config: &Arc, hub_cmd_send: &tokio::sync::mpsc::Sender, ) -> Result<(Option, Arc, CloseSend)> { let mut found_during_prune = None; @@ -120,12 +120,15 @@ impl Hub { pub async fn new( webrtc_config: Vec, url: &str, - config: Arc, + config: Arc, ) -> Result<(Self, HubRecv)> { let webrtc_config = Arc::new(Mutex::new(webrtc_config)); - let (client, mut recv) = - tx5_signal::SignalConnection::connect(url, config.clone()).await?; + let (client, mut recv) = tx5_signal::SignalConnection::connect( + url, + config.signal_config.clone(), + ) + .await?; let client = Arc::new(client); tracing::debug!(%url, pub_key = ?client.pub_key(), "hub connected"); diff --git a/crates/tx5-connection/src/lib.rs b/crates/tx5-connection/src/lib.rs index 45133093..3e392d86 100644 --- a/crates/tx5-connection/src/lib.rs +++ b/crates/tx5-connection/src/lib.rs @@ -24,14 +24,7 @@ //! make sure the backend doesn't change out from under you, set //! no-default-features and explicitly enable the backend of your choice. -#[cfg(any( - not(any(feature = "backend-go-pion", feature = "backend-webrtc-rs")), - all(feature = "backend-go-pion", feature = "backend-webrtc-rs"), -))] -compile_error!("Must specify exactly 1 webrtc backend"); - -#[cfg(feature = "backend-go-pion")] -pub use tx5_go_pion::Tx5InitConfig; +pub use tx5_core::Tx5InitConfig; macro_rules! breakable_timeout { ($($t:tt)*) => { @@ -100,7 +93,11 @@ impl Drop for CloseSend { impl CloseSend { pub fn channel() -> (Self, CloseRecv) { - let (s, r) = futures::channel::mpsc::channel(32); + Self::sized_channel(32) + } + + pub fn sized_channel(size: usize) -> (Self, CloseRecv) { + let (s, r) = futures::channel::mpsc::channel(size); ( Self { sender: Arc::new(Mutex::new(Some(s))), @@ -114,6 +111,17 @@ impl CloseSend { self.close_on_drop = close_on_drop; } + #[allow(dead_code)] // only used in libdatachannel backend + pub fn send_or_close(&self, t: T) { + let mut lock = self.sender.lock().unwrap(); + if let Some(sender) = &mut *lock { + if sender.try_send(t).is_err() { + sender.close_channel(); + *lock = None; + } + } + } + pub fn send( &self, t: T, @@ -130,6 +138,7 @@ impl CloseSend { } } + #[allow(dead_code)] // only used in go_pion backend pub fn send_slow_app( &self, t: T, @@ -162,6 +171,9 @@ impl CloseSend { } } +mod config; +pub use config::*; + mod webrtc; mod hub; diff --git a/crates/tx5-connection/src/test.rs b/crates/tx5-connection/src/test.rs index 98422d55..062b6395 100644 --- a/crates/tx5-connection/src/test.rs +++ b/crates/tx5-connection/src/test.rs @@ -7,6 +7,7 @@ fn init_tracing() { ) .with_file(true) .with_line_number(true) + .with_target(true) .finish(); let _ = tracing::subscriber::set_global_default(subscriber); } @@ -28,15 +29,30 @@ impl TestSrv { } pub async fn hub(&self) -> (Hub, HubRecv) { + let _ = tx5_core::Tx5InitConfig { + tracing_enabled: true, + ..Default::default() + } + .set_as_global_default(); + for addr in self.server.bind_addrs() { if let Ok(r) = Hub::new( - b"{}".to_vec(), + br#"{ + "iceServers": [ + { "urls": ["stun:stun.l.google.com:80"] }, + { "urls": ["stun:stun1.l.google.com:80"] } + ] + }"# + .to_vec(), &format!("ws://{addr}"), - Arc::new(tx5_signal::SignalConfig { - listener: true, - allow_plain_text: true, - max_idle: std::time::Duration::from_secs(1), - ..Default::default() + Arc::new(HubConfig { + backend_module: BackendModule::default(), + signal_config: Arc::new(tx5_signal::SignalConfig { + listener: true, + allow_plain_text: true, + max_idle: std::time::Duration::from_secs(1), + ..Default::default() + }), }), ) .await diff --git a/crates/tx5-connection/src/webrtc.rs b/crates/tx5-connection/src/webrtc.rs index f0528807..605221b0 100644 --- a/crates/tx5-connection/src/webrtc.rs +++ b/crates/tx5-connection/src/webrtc.rs @@ -1,5 +1,7 @@ -use crate::{AbortTask, CloseRecv, CloseSend}; -use std::io::{Error, Result}; +use crate::{BackendModule, CloseRecv}; +use futures::future::BoxFuture; +use std::io::Result; +use std::sync::Arc; pub enum WebrtcEvt { GeneratedOffer(Vec), @@ -9,265 +11,35 @@ pub enum WebrtcEvt { Ready, } -enum Cmd { - InOffer(Vec), - InAnswer(Vec), - InIce(Vec), - GeneratedIce(Vec), - DataChan( - tx5_go_pion::DataChannel, - tokio::sync::mpsc::UnboundedReceiver, - ), - SendMessage(Vec, tokio::sync::oneshot::Sender<()>), - RecvMessage(Vec), - DataChanOpen, - BufferedAmountLow, -} - -pub struct Webrtc { - cmd_send: CloseSend, - _task: AbortTask>, - _evt_send: CloseSend, +pub trait Webrtc: 'static + Send + Sync { + fn in_offer(&self, offer: Vec) -> BoxFuture<'_, Result<()>>; + fn in_answer(&self, answer: Vec) -> BoxFuture<'_, Result<()>>; + fn in_ice(&self, ice: Vec) -> BoxFuture<'_, Result<()>>; + fn message(&self, message: Vec) -> BoxFuture<'_, Result<()>>; } -impl Webrtc { - pub fn new( - is_polite: bool, - config: Vec, - send_buffer: usize, - ) -> (Self, CloseRecv) { - let (mut cmd_send, cmd_recv) = CloseSend::channel(); - let (mut evt_send, evt_recv) = CloseSend::channel(); - - let task = tokio::task::spawn(task( - is_polite, - config, - send_buffer, - evt_send.clone(), - cmd_send.clone(), - cmd_recv, - )); +pub type DynWebrtc = Arc; - cmd_send.set_close_on_drop(true); - evt_send.set_close_on_drop(true); +#[cfg(feature = "backend-libdatachannel")] +mod libdatachannel; - ( - Self { - cmd_send, - _task: AbortTask(task), - _evt_send: evt_send, - }, - evt_recv, - ) - } - - pub async fn in_offer(&self, offer: Vec) -> Result<()> { - self.cmd_send - .send(Cmd::InOffer(offer)) - .await - .map_err(|_| Error::other("closed")) - } +#[cfg(feature = "backend-go-pion")] +mod go_pion; - pub async fn in_answer(&self, answer: Vec) -> Result<()> { - self.cmd_send - .send(Cmd::InAnswer(answer)) - .await - .map_err(|_| Error::other("closed")) - } - - pub async fn in_ice(&self, ice: Vec) -> Result<()> { - self.cmd_send - .send(Cmd::InIce(ice)) - .await - .map_err(|_| Error::other("closed")) - } - - pub async fn message(&self, message: Vec) -> Result<()> { - let (s, r) = tokio::sync::oneshot::channel(); - self.cmd_send - .send(Cmd::SendMessage(message, s)) - .await - .map_err(|_| Error::other("closed"))?; - let _ = r.await; - Ok(()) - } -} - -async fn task( +pub fn new_backend_module( + module: BackendModule, is_polite: bool, config: Vec, send_buffer: usize, - mut evt_send: CloseSend, - cmd_send: CloseSend, - mut cmd_recv: CloseRecv, -) -> Result<()> { - evt_send.set_close_on_drop(true); - - let (peer, mut peer_evt) = tx5_go_pion::PeerConnection::new(config).await?; - - let mut cmd_send2 = cmd_send.clone(); - let _peer_task: AbortTask> = - AbortTask(tokio::task::spawn(async move { - cmd_send2.set_close_on_drop(true); - - use tx5_go_pion::PeerConnectionEvent as Evt; - while let Some(evt) = peer_evt.recv().await { - match evt { - Evt::Error(_) => break, - Evt::State(_) => (), - Evt::ICECandidate(mut ice) => { - cmd_send2 - .send(Cmd::GeneratedIce(ice.to_vec()?)) - .await?; - } - Evt::DataChannel(d, dr) => { - cmd_send2.send(Cmd::DataChan(d, dr)).await?; - } - } - } - Ok(()) - })); - - let mut offer = None; - let mut data = None; - let mut _data_recv = None; - let mut did_handshake = false; - let mut pend_buffer = Vec::new(); - - if !is_polite { - let (d, dr) = peer - .create_data_channel(b"{\"label\":\"data\"}".to_vec()) - .await?; - d.set_buffered_amount_low_threshold(send_buffer)?; - data = Some(d); - _data_recv = spawn_data_chan(cmd_send.clone(), dr); - let mut o = peer.create_offer(b"{}".to_vec()).await?; - evt_send - .send(WebrtcEvt::GeneratedOffer(o.to_vec()?)) - .await?; - offer = Some(o); - } - - loop { - let cmd = match cmd_recv.recv().await { - None => break, - Some(cmd) => cmd, - }; - - let mut slow_task = "unknown"; - - match breakable_timeout!(match cmd { - Cmd::InOffer(o) => { - slow_task = "in-offer"; - if is_polite && !did_handshake { - peer.set_remote_description(o).await?; - let mut a = peer.create_answer(b"{}".to_vec()).await?; - evt_send - .send(WebrtcEvt::GeneratedAnswer(a.to_vec()?)) - .await?; - peer.set_local_description(a).await?; - did_handshake = true; - } - } - Cmd::InAnswer(a) => { - slow_task = "in-answer"; - if !is_polite && !did_handshake { - if let Some(o) = offer.take() { - peer.set_local_description(o).await?; - peer.set_remote_description(a).await?; - did_handshake = true; - } - } - } - Cmd::InIce(i) => { - slow_task = "in-ice"; - let _ = peer.add_ice_candidate(i).await; - } - Cmd::GeneratedIce(ice) => { - slow_task = "gen-ice"; - evt_send.send(WebrtcEvt::GeneratedIce(ice)).await?; - } - Cmd::DataChan(d, dr) => { - slow_task = "data-chan"; - if data.is_none() { - d.set_buffered_amount_low_threshold(send_buffer)?; - data = Some(d); - _data_recv = spawn_data_chan(cmd_send.clone(), dr); - } - } - Cmd::SendMessage(msg, resp) => { - slow_task = "send-msg"; - if let Some(d) = &data { - let amt = match d.send(msg).await { - Ok(amt) => amt, - Err(_) => break, - }; - if amt <= send_buffer { - drop(resp); - pend_buffer.clear(); - } else { - pend_buffer.push(resp); - } - } else { - break; - } - } - Cmd::RecvMessage(msg) => { - slow_task = "recv-msg"; - evt_send.send(WebrtcEvt::Message(msg)).await?; - } - Cmd::DataChanOpen => { - slow_task = "chan-open"; - evt_send.send(WebrtcEvt::Ready).await?; - } - Cmd::BufferedAmountLow => { - slow_task = "buf-low"; - pend_buffer.clear(); - } - }) { - Err(_) => { - let err = format!("slow app on webrtc loop task: {slow_task}"); - tracing::warn!("{err}"); - Err(Error::other(err)) - } - Ok(r) => r, - }?; - } - - Ok(()) -} - -fn spawn_data_chan( - mut cmd_send: CloseSend, - mut data_recv: tokio::sync::mpsc::UnboundedReceiver< - tx5_go_pion::DataChannelEvent, - >, -) -> Option>> { - use tx5_go_pion::DataChannelEvent as Evt; - Some(AbortTask(tokio::task::spawn(async move { - cmd_send.set_close_on_drop(true); - - // Receiving on the unbounded data channel receiver has a real - // chance to fill our memory with message data. - // We give a small chance for the app to catch up, otherwise - // error so the connection will close. - while let Some(evt) = data_recv.recv().await { - match evt { - Evt::Error(_) => break, - Evt::Open => { - cmd_send.send_slow_app(Cmd::DataChanOpen).await?; - } - Evt::Close => break, - Evt::Message(mut msg) => { - cmd_send - .send_slow_app(Cmd::RecvMessage(msg.to_vec()?)) - .await?; - } - Evt::BufferedAmountLow => { - cmd_send.send_slow_app(Cmd::BufferedAmountLow).await?; - } - } +) -> (DynWebrtc, CloseRecv) { + match module { + #[cfg(feature = "backend-libdatachannel")] + BackendModule::LibDataChannel => { + libdatachannel::Webrtc::new(is_polite, config, send_buffer) + } + #[cfg(feature = "backend-go-pion")] + BackendModule::GoPion => { + go_pion::Webrtc::new(is_polite, config, send_buffer) } - Ok(()) - }))) + } } diff --git a/crates/tx5-connection/src/webrtc/go_pion.rs b/crates/tx5-connection/src/webrtc/go_pion.rs new file mode 100644 index 00000000..7f2c7698 --- /dev/null +++ b/crates/tx5-connection/src/webrtc/go_pion.rs @@ -0,0 +1,276 @@ +use super::*; +use crate::{AbortTask, CloseRecv, CloseSend}; +use std::io::{Error, Result}; + +enum Cmd { + InOffer(Vec), + InAnswer(Vec), + InIce(Vec), + GeneratedIce(Vec), + DataChan( + tx5_go_pion::DataChannel, + tokio::sync::mpsc::UnboundedReceiver, + ), + SendMessage(Vec, tokio::sync::oneshot::Sender<()>), + RecvMessage(Vec), + DataChanOpen, + BufferedAmountLow, +} + +pub struct Webrtc { + cmd_send: CloseSend, + _task: AbortTask>, + _evt_send: CloseSend, +} + +impl Webrtc { + #[allow(clippy::new_ret_no_self)] + pub fn new( + is_polite: bool, + config: Vec, + send_buffer: usize, + ) -> (DynWebrtc, CloseRecv) { + let (mut cmd_send, cmd_recv) = CloseSend::channel(); + let (mut evt_send, evt_recv) = CloseSend::channel(); + + let task = tokio::task::spawn(task( + is_polite, + config, + send_buffer, + evt_send.clone(), + cmd_send.clone(), + cmd_recv, + )); + + cmd_send.set_close_on_drop(true); + evt_send.set_close_on_drop(true); + + let this: DynWebrtc = Arc::new(Self { + cmd_send, + _task: AbortTask(task), + _evt_send: evt_send, + }); + + (this, evt_recv) + } +} + +impl super::Webrtc for Webrtc { + fn in_offer(&self, offer: Vec) -> BoxFuture<'_, Result<()>> { + Box::pin(async move { + self.cmd_send + .send(Cmd::InOffer(offer)) + .await + .map_err(|_| Error::other("closed")) + }) + } + + fn in_answer(&self, answer: Vec) -> BoxFuture<'_, Result<()>> { + Box::pin(async move { + self.cmd_send + .send(Cmd::InAnswer(answer)) + .await + .map_err(|_| Error::other("closed")) + }) + } + + fn in_ice(&self, ice: Vec) -> BoxFuture<'_, Result<()>> { + Box::pin(async move { + self.cmd_send + .send(Cmd::InIce(ice)) + .await + .map_err(|_| Error::other("closed")) + }) + } + + fn message(&self, message: Vec) -> BoxFuture<'_, Result<()>> { + Box::pin(async move { + let (s, r) = tokio::sync::oneshot::channel(); + self.cmd_send + .send(Cmd::SendMessage(message, s)) + .await + .map_err(|_| Error::other("closed"))?; + let _ = r.await; + Ok(()) + }) + } +} + +async fn task( + is_polite: bool, + config: Vec, + send_buffer: usize, + mut evt_send: CloseSend, + cmd_send: CloseSend, + mut cmd_recv: CloseRecv, +) -> Result<()> { + evt_send.set_close_on_drop(true); + + let (peer, mut peer_evt) = tx5_go_pion::PeerConnection::new(config).await?; + + let mut cmd_send2 = cmd_send.clone(); + let _peer_task: AbortTask> = + AbortTask(tokio::task::spawn(async move { + cmd_send2.set_close_on_drop(true); + + use tx5_go_pion::PeerConnectionEvent as Evt; + while let Some(evt) = peer_evt.recv().await { + match evt { + Evt::Error(_) => break, + Evt::State(_) => (), + Evt::ICECandidate(mut ice) => { + cmd_send2 + .send(Cmd::GeneratedIce(ice.to_vec()?)) + .await?; + } + Evt::DataChannel(d, dr) => { + cmd_send2.send(Cmd::DataChan(d, dr)).await?; + } + } + } + Ok(()) + })); + + let mut offer = None; + let mut data = None; + let mut _data_recv = None; + let mut did_handshake = false; + let mut pend_buffer = Vec::new(); + + if !is_polite { + let (d, dr) = peer + .create_data_channel(b"{\"label\":\"data\"}".to_vec()) + .await?; + d.set_buffered_amount_low_threshold(send_buffer)?; + data = Some(d); + _data_recv = spawn_data_chan(cmd_send.clone(), dr); + let mut o = peer.create_offer(b"{}".to_vec()).await?; + evt_send + .send(WebrtcEvt::GeneratedOffer(o.to_vec()?)) + .await?; + offer = Some(o); + } + + loop { + let cmd = match cmd_recv.recv().await { + None => break, + Some(cmd) => cmd, + }; + + let mut slow_task = "unknown"; + + match breakable_timeout!(match cmd { + Cmd::InOffer(o) => { + slow_task = "in-offer"; + if is_polite && !did_handshake { + peer.set_remote_description(o).await?; + let mut a = peer.create_answer(b"{}".to_vec()).await?; + evt_send + .send(WebrtcEvt::GeneratedAnswer(a.to_vec()?)) + .await?; + peer.set_local_description(a).await?; + did_handshake = true; + } + } + Cmd::InAnswer(a) => { + slow_task = "in-answer"; + if !is_polite && !did_handshake { + if let Some(o) = offer.take() { + peer.set_local_description(o).await?; + peer.set_remote_description(a).await?; + did_handshake = true; + } + } + } + Cmd::InIce(i) => { + slow_task = "in-ice"; + let _ = peer.add_ice_candidate(i).await; + } + Cmd::GeneratedIce(ice) => { + slow_task = "gen-ice"; + evt_send.send(WebrtcEvt::GeneratedIce(ice)).await?; + } + Cmd::DataChan(d, dr) => { + slow_task = "data-chan"; + if data.is_none() { + d.set_buffered_amount_low_threshold(send_buffer)?; + data = Some(d); + _data_recv = spawn_data_chan(cmd_send.clone(), dr); + } + } + Cmd::SendMessage(msg, resp) => { + slow_task = "send-msg"; + if let Some(d) = &data { + let amt = match d.send(msg).await { + Ok(amt) => amt, + Err(_) => break, + }; + if amt <= send_buffer { + drop(resp); + pend_buffer.clear(); + } else { + pend_buffer.push(resp); + } + } else { + break; + } + } + Cmd::RecvMessage(msg) => { + slow_task = "recv-msg"; + evt_send.send(WebrtcEvt::Message(msg)).await?; + } + Cmd::DataChanOpen => { + slow_task = "chan-open"; + evt_send.send(WebrtcEvt::Ready).await?; + } + Cmd::BufferedAmountLow => { + slow_task = "buf-low"; + pend_buffer.clear(); + } + }) { + Err(_) => { + let err = format!("slow app on webrtc loop task: {slow_task}"); + tracing::warn!("{err}"); + Err(Error::other(err)) + } + Ok(r) => r, + }?; + } + + Ok(()) +} + +fn spawn_data_chan( + mut cmd_send: CloseSend, + mut data_recv: tokio::sync::mpsc::UnboundedReceiver< + tx5_go_pion::DataChannelEvent, + >, +) -> Option>> { + use tx5_go_pion::DataChannelEvent as Evt; + Some(AbortTask(tokio::task::spawn(async move { + cmd_send.set_close_on_drop(true); + + // Receiving on the unbounded data channel receiver has a real + // chance to fill our memory with message data. + // We give a small chance for the app to catch up, otherwise + // error so the connection will close. + while let Some(evt) = data_recv.recv().await { + match evt { + Evt::Error(_) => break, + Evt::Open => { + cmd_send.send_slow_app(Cmd::DataChanOpen).await?; + } + Evt::Close => break, + Evt::Message(mut msg) => { + cmd_send + .send_slow_app(Cmd::RecvMessage(msg.to_vec()?)) + .await?; + } + Evt::BufferedAmountLow => { + cmd_send.send_slow_app(Cmd::BufferedAmountLow).await?; + } + } + } + Ok(()) + }))) +} diff --git a/crates/tx5-connection/src/webrtc/libdatachannel.rs b/crates/tx5-connection/src/webrtc/libdatachannel.rs new file mode 100644 index 00000000..cb7ef764 --- /dev/null +++ b/crates/tx5-connection/src/webrtc/libdatachannel.rs @@ -0,0 +1,382 @@ +use super::*; +use crate::{AbortTask, CloseRecv, CloseSend}; +use std::io::{Error, Result}; + +type MapErr = Box F>; +fn map_err(s: &'static str) -> MapErr { + Box::new(move |e| std::io::Error::other(format!("{s}: {e:?}"))) +} + +enum Cmd { + InOffer(Vec), + InAnswer(Vec), + InIce(Vec), + GeneratedIce(datachannel::IceCandidate), + DataChan(Box>), + SendMessage(Vec, tokio::sync::oneshot::Sender<()>), + RecvMessage(Vec), + RecvDescription(Box), + DataChanOpen, + BufferedAmountLow, + Error(std::io::Error), +} + +struct Dch(CloseSend); + +impl datachannel::DataChannelHandler for Dch { + fn on_open(&mut self) { + self.0.send_or_close(Cmd::DataChanOpen); + } + + fn on_closed(&mut self) { + self.0 + .send_or_close(Cmd::Error(std::io::Error::other("DataChanClosed"))); + } + + fn on_error(&mut self, err: &str) { + self.0 + .send_or_close(Cmd::Error(std::io::Error::other(format!( + "DataChanError: {err}" + )))); + } + + fn on_message(&mut self, msg: &[u8]) { + self.0.send_or_close(Cmd::RecvMessage(msg.to_vec())); + } + + fn on_buffered_amount_low(&mut self) { + self.0.send_or_close(Cmd::BufferedAmountLow); + } + + /* + fn on_available(&mut self) { + // TODO - figure out what this is + } + */ +} + +struct Pch(CloseSend); + +impl datachannel::PeerConnectionHandler for Pch { + type DCH = Dch; + + fn data_channel_handler( + &mut self, + _info: datachannel::DataChannelInfo, + ) -> Self::DCH { + Dch(self.0.clone()) + } + + fn on_description(&mut self, sess_desc: datachannel::SessionDescription) { + self.0 + .send_or_close(Cmd::RecvDescription(Box::new(sess_desc))); + } + + fn on_candidate(&mut self, cand: datachannel::IceCandidate) { + self.0.send_or_close(Cmd::GeneratedIce(cand)); + } + + fn on_data_channel( + &mut self, + data_channel: Box>, + ) { + self.0.send_or_close(Cmd::DataChan(data_channel)); + } +} + +pub struct Webrtc { + cmd_send: CloseSend, + _task: AbortTask<()>, + _evt_send: CloseSend, +} + +impl Webrtc { + #[allow(clippy::new_ret_no_self)] + #[allow(clippy::needless_return)] + pub fn new( + is_polite: bool, + config: Vec, + send_buffer: usize, + ) -> (DynWebrtc, CloseRecv) { + static INIT_TRACING: std::sync::Once = std::sync::Once::new(); + INIT_TRACING.call_once(|| { + use tracing::event_enabled; + use tracing::Level; + + if !tx5_core::Tx5InitConfig::get().tracing_enabled { + return; + } + + if event_enabled!(target: "datachannel", Level::TRACE) { + datachannel::configure_logging(Level::TRACE); + return; + } + if event_enabled!(target: "datachannel", Level::DEBUG) { + datachannel::configure_logging(Level::DEBUG); + return; + } + if event_enabled!(target: "datachannel", Level::INFO) { + datachannel::configure_logging(Level::INFO); + return; + } + if event_enabled!(target: "datachannel", Level::WARN) { + datachannel::configure_logging(Level::WARN); + return; + } + if event_enabled!(target: "datachannel", Level::ERROR) { + datachannel::configure_logging(Level::ERROR); + return; + } + }); + + let (mut cmd_send, cmd_recv) = CloseSend::sized_channel(1024); + let (mut evt_send, evt_recv) = CloseSend::sized_channel(1024); + + let task = tokio::task::spawn(task( + is_polite, + config, + send_buffer, + evt_send.clone(), + cmd_send.clone(), + cmd_recv, + )); + + cmd_send.set_close_on_drop(true); + evt_send.set_close_on_drop(true); + + let this: DynWebrtc = Arc::new(Self { + cmd_send, + _task: AbortTask(task), + _evt_send: evt_send, + }); + + (this, evt_recv) + } +} + +impl super::Webrtc for Webrtc { + fn in_offer(&self, offer: Vec) -> BoxFuture<'_, Result<()>> { + Box::pin(async move { + self.cmd_send + .send(Cmd::InOffer(offer)) + .await + .map_err(|_| Error::other("closed")) + }) + } + + fn in_answer(&self, answer: Vec) -> BoxFuture<'_, Result<()>> { + Box::pin(async move { + self.cmd_send + .send(Cmd::InAnswer(answer)) + .await + .map_err(|_| Error::other("closed")) + }) + } + + fn in_ice(&self, ice: Vec) -> BoxFuture<'_, Result<()>> { + Box::pin(async move { + self.cmd_send + .send(Cmd::InIce(ice)) + .await + .map_err(|_| Error::other("closed")) + }) + } + + fn message(&self, message: Vec) -> BoxFuture<'_, Result<()>> { + Box::pin(async move { + let (s, r) = tokio::sync::oneshot::channel(); + self.cmd_send + .send(Cmd::SendMessage(message, s)) + .await + .map_err(|_| Error::other("closed"))?; + let _ = r.await; + Ok(()) + }) + } +} + +async fn task( + is_polite: bool, + config: Vec, + send_buffer: usize, + evt_send: CloseSend, + cmd_send: CloseSend, + cmd_recv: CloseRecv, +) { + if let Err(err) = + task_err(is_polite, config, send_buffer, evt_send, cmd_send, cmd_recv) + .await + { + tracing::warn!(?err, "webrtc task error"); + } +} + +async fn task_err( + is_polite: bool, + config: Vec, + send_buffer: usize, + mut evt_send: CloseSend, + cmd_send: CloseSend, + mut cmd_recv: CloseRecv, +) -> Result<()> { + evt_send.set_close_on_drop(true); + + #[derive(Debug, Default, serde::Deserialize)] + #[serde(rename_all = "camelCase")] + struct U { + pub urls: Vec, + } + + #[derive(Debug, serde::Deserialize)] + #[serde(rename_all = "camelCase")] + struct C { + #[serde(default)] + pub ice_servers: Vec, + } + + let mut ice = Vec::new(); + match serde_json::from_slice::(&config) { + Ok(mut c) => { + for mut u in c.ice_servers.drain(..) { + ice.append(&mut u.urls); + } + } + Err(err) => tracing::error!(?err, "failed to parse iceServers"), + } + + let init_config = tx5_core::Tx5InitConfig::get(); + + // TODO - max_message_size? + let config = datachannel::RtcConfig::new::(&ice) + .port_range_begin(init_config.ephemeral_udp_port_min) + .port_range_end(init_config.ephemeral_udp_port_max); + let mut peer = + datachannel::RtcPeerConnection::new(&config, Pch(cmd_send.clone())) + .map_err(map_err("constructing peer connection"))?; + + let mut data = None; + let mut did_handshake = false; + let mut pend_buffer = Vec::new(); + + if !is_polite { + let mut d = peer + .create_data_channel("data", Dch(cmd_send.clone())) + .map_err(map_err("creating data channel"))?; + d.set_buffered_amount_low_threshold(send_buffer) + .map_err(map_err("setting buffer low threshold (out)"))?; + data = Some(d); + peer.set_local_description(datachannel::SdpType::Offer) + .map_err(map_err("setting local desc to offer"))?; + } + + loop { + let cmd = match cmd_recv.recv().await { + None => break, + Some(cmd) => cmd, + }; + + match cmd { + Cmd::InOffer(o) => { + if is_polite && !did_handshake { + let o: datachannel::SessionDescription = + serde_json::from_slice(&o) + .map_err(map_err("deserializing remote offer"))?; + peer.set_remote_description(&o) + .map_err(map_err("setting remote offer desc"))?; + // NOTE: I guess this auto-answers?? + // We get a Runtime error if we call this explicitly: + //peer.set_local_description(datachannel::SdpType::Answer) + // .map_err(map_err("setting local desc to answer"))?; + did_handshake = true; + } + } + Cmd::InAnswer(a) => { + if !is_polite && !did_handshake { + let a: datachannel::SessionDescription = + serde_json::from_slice(&a) + .map_err(map_err("deserializing remote answer"))?; + peer.set_remote_description(&a) + .map_err(map_err("setting remote answer desc"))?; + did_handshake = true; + } + } + Cmd::InIce(i) => { + let i: datachannel::IceCandidate = + serde_json::from_slice(&i) + .map_err(map_err("deserializing remote candidate"))?; + if let Err(err) = peer + .add_remote_candidate(&i) + .map_err(map_err("adding remote candidate")) + { + // Don't error on ice candidates, it might be from + // a previous negotiation, just note it in the trace + tracing::warn!(?err, "failed to add remote candidate"); + } + } + Cmd::GeneratedIce(ice) => { + evt_send + .send(WebrtcEvt::GeneratedIce( + serde_json::to_string(&ice)?.into_bytes(), + )) + .await?; + } + Cmd::DataChan(mut d) => { + if data.is_none() { + d.set_buffered_amount_low_threshold(send_buffer).map_err( + map_err("setting buffer low threshold (in)"), + )?; + data = Some(d); + } else { + return Err(std::io::Error::other("duplicate data chan")); + } + } + Cmd::SendMessage(msg, resp) => { + if let Some(d) = &mut data { + d.send(&msg).map_err(map_err("sending message"))?; + let amt = d.buffered_amount(); + if amt <= send_buffer { + drop(resp); + pend_buffer.clear(); + } else { + pend_buffer.push(resp); + } + } else { + break; + } + } + Cmd::RecvMessage(msg) => { + evt_send.send(WebrtcEvt::Message(msg)).await?; + } + Cmd::RecvDescription(desc) => match desc.sdp_type { + datachannel::SdpType::Offer => { + evt_send + .send(WebrtcEvt::GeneratedOffer( + serde_json::to_string(&desc)?.into_bytes(), + )) + .await?; + } + datachannel::SdpType::Answer => { + evt_send + .send(WebrtcEvt::GeneratedAnswer( + serde_json::to_string(&desc)?.into_bytes(), + )) + .await?; + } + _ => { + return Err(std::io::Error::other( + "unhandled sdp desc type", + )) + } + }, + Cmd::DataChanOpen => { + evt_send.send(WebrtcEvt::Ready).await?; + } + Cmd::BufferedAmountLow => { + pend_buffer.clear(); + } + Cmd::Error(err) => return Err(err), + } + } + + Ok(()) +} diff --git a/crates/tx5/Cargo.toml b/crates/tx5/Cargo.toml index 4b2238da..9383de1d 100644 --- a/crates/tx5/Cargo.toml +++ b/crates/tx5/Cargo.toml @@ -13,6 +13,9 @@ categories = ["network-programming"] [features] default = [ "backend-go-pion" ] +# use the libdatachannel crate as the webrtc backend +backend-libdatachannel = [ "tx5-connection/backend-libdatachannel" ] + # use the tx5-go-pion crate as the webrtc backend backend-go-pion = [ "tx5-connection/backend-go-pion" ] diff --git a/crates/tx5/src/backend.rs b/crates/tx5/src/backend.rs index 2cc79b36..d2d54ccd 100644 --- a/crates/tx5/src/backend.rs +++ b/crates/tx5/src/backend.rs @@ -8,14 +8,18 @@ use futures::future::BoxFuture; use crate::{Config, PubKey}; use tx5_core::deps::serde_json; -#[cfg(feature = "backend-go-pion")] -mod go_pion; +#[cfg(any(feature = "backend-go-pion", feature = "backend-libdatachannel"))] +mod be_tx5_connection; mod mem; /// Backend modules usable by tx5. #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] pub enum BackendModule { + #[cfg(feature = "backend-libdatachannel")] + /// The libdatachannel-based backend. + LibDataChannel, + #[cfg(feature = "backend-go-pion")] /// The Go Pion-based backend. GoPion, @@ -31,6 +35,8 @@ pub enum BackendModule { impl Default for BackendModule { #[allow(unreachable_code)] fn default() -> Self { + #[cfg(feature = "backend-libdatachannel")] + return Self::LibDataChannel; #[cfg(feature = "backend-go-pion")] return Self::GoPion; #[cfg(feature = "backend-webrtc-rs")] @@ -43,8 +49,10 @@ impl BackendModule { /// Get a default version of the module-specific config. pub fn default_config(&self) -> serde_json::Value { match self { + #[cfg(feature = "backend-libdatachannel")] + Self::LibDataChannel => be_tx5_connection::default_config(), #[cfg(feature = "backend-go-pion")] - Self::GoPion => go_pion::default_config(), + Self::GoPion => be_tx5_connection::default_config(), #[cfg(feature = "backend-webrtc-rs")] Self::WebrtcRs => todo!(), Self::Mem => mem::default_config(), @@ -59,8 +67,14 @@ impl BackendModule { config: &Arc, ) -> Result<(DynBackEp, DynBackEpRecvCon)> { match self { + #[cfg(feature = "backend-libdatachannel")] + Self::LibDataChannel => { + be_tx5_connection::connect(config, url, listener).await + } #[cfg(feature = "backend-go-pion")] - Self::GoPion => go_pion::connect(config, url, listener).await, + Self::GoPion => { + be_tx5_connection::connect(config, url, listener).await + } #[cfg(feature = "backend-webrtc-rs")] Self::WebrtcRs => todo!(), Self::Mem => mem::connect(config, url, listener).await, diff --git a/crates/tx5/src/backend/go_pion.rs b/crates/tx5/src/backend/be_tx5_connection.rs similarity index 84% rename from crates/tx5/src/backend/go_pion.rs rename to crates/tx5/src/backend/be_tx5_connection.rs index a7e8eea6..2455b8de 100644 --- a/crates/tx5/src/backend/go_pion.rs +++ b/crates/tx5/src/backend/be_tx5_connection.rs @@ -129,9 +129,27 @@ pub async fn connect( max_idle: config.timeout, ..Default::default() }; + + let backend_module = match config.backend_module { + #[cfg(feature = "backend-libdatachannel")] + BackendModule::LibDataChannel => { + tx5_connection::BackendModule::LibDataChannel + } + #[cfg(feature = "backend-go-pion")] + BackendModule::GoPion => tx5_connection::BackendModule::GoPion, + oth => { + return Err(std::io::Error::other(format!( + "unsupported backend module: {oth:?}" + ))) + } + }; + + let hub_config = Arc::new(tx5_connection::HubConfig { + backend_module, + signal_config: Arc::new(sig_config), + }); let (hub, hub_recv) = - tx5_connection::Hub::new(webrtc_config, url, Arc::new(sig_config)) - .await?; + tx5_connection::Hub::new(webrtc_config, url, hub_config).await?; let ep: DynBackEp = Arc::new(GoEp(hub)); let ep_recv: DynBackEpRecvCon = Box::new(GoEpRecvCon(hub_recv)); Ok((ep, ep_recv)) diff --git a/crates/tx5/src/ep.rs b/crates/tx5/src/ep.rs index f5f9fe95..8c638e8a 100644 --- a/crates/tx5/src/ep.rs +++ b/crates/tx5/src/ep.rs @@ -294,10 +294,15 @@ impl Endpoint { /// Get stats. pub fn get_stats(&self) -> stats::Stats { - #[cfg(feature = "backend-go-pion")] - let backend = stats::StatsBackend::BackendGoPion; - #[cfg(feature = "backend-webrtc-rs")] - let backend = stats::StatsBackend::BackendWebrtcRs; + let backend = match self.config.backend_module { + #[cfg(feature = "backend-libdatachannel")] + BackendModule::LibDataChannel => { + stats::StatsBackend::BackendLibDataChannel + } + #[cfg(feature = "backend-go-pion")] + BackendModule::GoPion => stats::StatsBackend::BackendGoPion, + BackendModule::Mem => stats::StatsBackend::BackendMem, + }; let connection_list = self .inner diff --git a/crates/tx5/src/stats.rs b/crates/tx5/src/stats.rs index 252b5303..3df56b4a 100644 --- a/crates/tx5/src/stats.rs +++ b/crates/tx5/src/stats.rs @@ -5,11 +5,17 @@ #[serde(rename_all = "camelCase")] #[non_exhaustive] pub enum StatsBackend { + /// The rust ffi bindings to the libdatachannel webrtc library. + BackendLibDataChannel, + /// The rust ffi bindings to the golang pion webrtc library. BackendGoPion, /// The rust webrtc library. BackendWebrtcRs, + + /// The mem-only stub/test backend. + BackendMem, } /// Data for an individual connection. diff --git a/crates/tx5/src/test.rs b/crates/tx5/src/test.rs index d45e5bb2..052d93ae 100644 --- a/crates/tx5/src/test.rs +++ b/crates/tx5/src/test.rs @@ -1,5 +1,7 @@ use crate::*; +const DISCON: &[u8] = b"<<>>"; + struct TestEp { ep: Endpoint, task: tokio::task::JoinHandle<()>, @@ -47,10 +49,7 @@ impl TestEp { } } EndpointEvent::Disconnected { peer_url } => { - if send - .send((peer_url, b"<<>>".to_vec())) - .is_err() - { + if send.send((peer_url, DISCON.to_vec())).is_err() { break; } } @@ -110,6 +109,12 @@ impl Test { let _ = tracing::subscriber::set_global_default(subscriber); + let _ = tx5_core::Tx5InitConfig { + tracing_enabled: true, + ..Default::default() + } + .set_as_global_default(); + let mut this = Test { sig_srv_hnd: None, sig_port: None, @@ -195,8 +200,18 @@ async fn webrtc_transition_ordering() { } }); + struct D(tokio::task::JoinHandle<()>); + + impl Drop for D { + fn drop(&mut self) { + self.0.abort(); + } + } + + let _d = D(ts1); + let tr2 = tokio::task::spawn(tokio::time::timeout( - std::time::Duration::from_secs(10), + std::time::Duration::from_secs(30), async move { // at least the first message should be passed before webrtc // can connect @@ -220,7 +235,6 @@ async fn webrtc_transition_ordering() { if !got_non_webrtc { panic!("failed to receive any pre-webrtc messages"); } - ts1.abort(); break; } else { got_non_webrtc = true;