Skip to content

Commit

Permalink
abstract tx5 backend in prep for m̶o̶c̶k̶ mem backend (#105)
Browse files Browse the repository at this point in the history
* abstract tx5 backend in prep for mock backend

* Update crates/tx5/src/backend/go_pion.rs

Co-authored-by: Callum Dunster <[email protected]>

* address code review comments

* address code review comment

---------

Co-authored-by: Callum Dunster <[email protected]>
  • Loading branch information
neonphog and cdunster authored Oct 14, 2024
1 parent 05e82a2 commit 60c2321
Show file tree
Hide file tree
Showing 8 changed files with 322 additions and 71 deletions.
1 change: 1 addition & 0 deletions crates/tx5/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ backend-webrtc-rs = [ "tx5-connection/backend-webrtc-rs" ]

[dependencies]
base64 = { workspace = true }
futures = { workspace = true }
influxive-otel-atomic-obs = { workspace = true }
serde = { workspace = true }
tokio = { workspace = true, features = [ "full" ] }
Expand Down
134 changes: 134 additions & 0 deletions crates/tx5/src/backend.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
//! Backend modules usable by tx5.
use std::io::Result;
use std::sync::Arc;

use futures::future::BoxFuture;

use crate::{Config, PubKey};
use tx5_core::deps::serde_json;

#[cfg(feature = "backend-go-pion")]
mod go_pion;

/// Backend modules usable by tx5.
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum BackendModule {
#[cfg(feature = "backend-go-pion")]
/// The Go Pion-based backend.
GoPion,

#[cfg(feature = "backend-webrtc-rs")]
/// The Webrtc-RS-based backend.
WebrtcRs,

/// The mock backend.
Mock,
}

impl Default for BackendModule {
#[allow(unreachable_code)]
fn default() -> Self {
#[cfg(feature = "backend-go-pion")]
return Self::GoPion;
#[cfg(feature = "backend-webrtc-rs")]
return Self::WebrtcRs;
Self::Mock
}
}

impl BackendModule {
/// Get a default version of the module-specific config.
pub fn default_config(&self) -> serde_json::Value {
match self {
#[cfg(feature = "backend-go-pion")]
Self::GoPion => go_pion::default_config(),
#[cfg(feature = "backend-webrtc-rs")]
Self::WebrtcRs => todo!(),
Self::Mock => serde_json::json!({}),
}
}

/// Connect a new backend module endpoint.
pub async fn connect(
&self,
url: &str,
listener: bool,
config: &Arc<Config>,
) -> Result<(DynBackEp, DynBackEpRecvCon)> {
match self {
#[cfg(feature = "backend-go-pion")]
Self::GoPion => go_pion::connect(config, url, listener).await,
#[cfg(feature = "backend-webrtc-rs")]
Self::WebrtcRs => todo!(),
Self::Mock => todo!(),
}
}
}

/// Backend connection.
pub trait BackCon: 'static + Send + Sync {
/// Send data over this backend connection.
fn send(&self, data: Vec<u8>) -> BoxFuture<'_, Result<()>>;

/// Get the pub_key identifying this connection.
fn pub_key(&self) -> &PubKey;

/// Returns `true` if we successfully connected over webrtc.
// TODO - this isn't good encapsulation
fn is_using_webrtc(&self) -> bool;

/// Get connection statistics.
// TODO - this isn't good encapsulation
fn get_stats(&self) -> tx5_connection::ConnStats;
}

/// Trait-object version of backend connection.
pub type DynBackCon = Arc<dyn BackCon + 'static + Send + Sync>;

/// Backend connection receiver.
pub trait BackConRecvData: 'static + Send {
/// Receive data from this backend connection.
fn recv(&mut self) -> BoxFuture<'_, Option<Vec<u8>>>;
}

/// Trait-object version of backend connection receiver.
pub type DynBackConRecvData = Box<dyn BackConRecvData + 'static + Send>;

/// Pending connection.
pub trait BackWaitCon: 'static + Send {
/// Wait for the connection
fn wait(
&mut self,
// TODO - this isn't good encapsulation
recv_limit: Arc<tokio::sync::Semaphore>,
) -> BoxFuture<'static, Result<(DynBackCon, DynBackConRecvData)>>;

/// Get the pub_key identifying this connection.
fn pub_key(&self) -> &PubKey;
}

/// Trait-object version of backend wait con.
pub type DynBackWaitCon = Box<dyn BackWaitCon + 'static + Send>;

/// Backend endpoint.
pub trait BackEp: 'static + Send + Sync {
/// Establish an outgoing connection from this backend endpoint.
fn connect(&self, pub_key: PubKey)
-> BoxFuture<'_, Result<DynBackWaitCon>>;

/// Get the pub_key identifying this endpoint.
fn pub_key(&self) -> &PubKey;
}

/// Trait-object version of backend endpoint.
pub type DynBackEp = Arc<dyn BackEp + 'static + Send + Sync>;

/// Backend endpoint receiver.
pub trait BackEpRecvCon: 'static + Send {
/// Receive incoming connection from this backend endpoint.
fn recv(&mut self) -> BoxFuture<'_, Option<DynBackWaitCon>>;
}

/// Trait-object version of backend endpoint receiver.
pub type DynBackEpRecvCon = Box<dyn BackEpRecvCon + 'static + Send>;
138 changes: 138 additions & 0 deletions crates/tx5/src/backend/go_pion.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
//! go pion backend
use super::*;
use crate::Config;

struct GoCon(tx5_connection::FramedConn);

impl BackCon for GoCon {
fn send(&self, data: Vec<u8>) -> BoxFuture<'_, Result<()>> {
Box::pin(async { self.0.send(data).await })
}

fn pub_key(&self) -> &PubKey {
self.0.pub_key()
}

fn is_using_webrtc(&self) -> bool {
self.0.is_using_webrtc()
}

fn get_stats(&self) -> tx5_connection::ConnStats {
self.0.get_stats()
}
}

struct GoConRecvData(tx5_connection::FramedConnRecv);

impl BackConRecvData for GoConRecvData {
fn recv(&mut self) -> BoxFuture<'_, Option<Vec<u8>>> {
Box::pin(async { self.0.recv().await })
}
}

struct GoWaitCon {
pub_key: PubKey,
con: Option<Arc<tx5_connection::Conn>>,
con_recv: Option<tx5_connection::ConnRecv>,
}

impl BackWaitCon for GoWaitCon {
fn wait(
&mut self,
recv_limit: Arc<tokio::sync::Semaphore>,
) -> BoxFuture<'static, Result<(DynBackCon, DynBackConRecvData)>> {
let con = self.con.take();
let con_recv = self.con_recv.take();
Box::pin(async move {
let (con, con_recv) = match (con, con_recv) {
(Some(con), Some(con_recv)) => (con, con_recv),
_ => return Err(std::io::Error::other("already awaited")),
};

con.ready().await;

let (con, con_recv) =
tx5_connection::FramedConn::new(con, con_recv, recv_limit)
.await?;

let con: DynBackCon = Arc::new(GoCon(con));
let con_recv: DynBackConRecvData =
Box::new(GoConRecvData(con_recv));

Ok((con, con_recv))
})
}

fn pub_key(&self) -> &PubKey {
&self.pub_key
}
}

struct GoEp(tx5_connection::Hub);

impl BackEp for GoEp {
fn connect(
&self,
pub_key: PubKey,
) -> BoxFuture<'_, Result<DynBackWaitCon>> {
Box::pin(async {
let (con, con_recv) = self.0.connect(pub_key).await?;
let pub_key = con.pub_key().clone();
let wc: DynBackWaitCon = Box::new(GoWaitCon {
pub_key,
con: Some(con),
con_recv: Some(con_recv),
});
Ok(wc)
})
}

fn pub_key(&self) -> &PubKey {
self.0.pub_key()
}
}

struct GoEpRecvCon(tx5_connection::HubRecv);

impl BackEpRecvCon for GoEpRecvCon {
fn recv(&mut self) -> BoxFuture<'_, Option<DynBackWaitCon>> {
Box::pin(async {
let (con, con_recv) = self.0.accept().await?;
let pub_key = con.pub_key().clone();
let wc: DynBackWaitCon = Box::new(GoWaitCon {
pub_key,
con: Some(con),
con_recv: Some(con_recv),
});
Some(wc)
})
}
}

/// Get a default version of the module-specific config.
pub fn default_config() -> serde_json::Value {
serde_json::json!({})
}

/// Connect a new backend based on the tx5-go-pion backend.
pub async fn connect(
config: &Arc<Config>,
url: &str,
listener: bool,
) -> Result<(DynBackEp, DynBackEpRecvCon)> {
let webrtc_config = config.initial_webrtc_config.clone().into_bytes();
let sig_config = tx5_connection::tx5_signal::SignalConfig {
listener,
allow_plain_text: config.signal_allow_plain_text,
//max_connections: config.connection_count_max as usize,
max_idle: config.timeout,
..Default::default()
};
let (hub, hub_recv) =
tx5_connection::Hub::new(webrtc_config, url, Arc::new(sig_config))
.await?;
let ep: DynBackEp = Arc::new(GoEp(hub));
let ep_recv: DynBackEpRecvCon = Box::new(GoEpRecvCon(hub_recv));
Ok((ep, ep_recv))
}
12 changes: 12 additions & 0 deletions crates/tx5/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::*;
use tx5_core::deps::serde_json;

/// Tx5 endpoint configuration.
pub struct Config {
Expand Down Expand Up @@ -40,6 +41,15 @@ pub struct Config {
/// set the callbacks here, otherwise no preflight will
/// be sent nor validated. Default: None.
pub preflight: Option<(PreflightSendCb, PreflightCheckCb)>,

/// The backend connection module to use.
/// For the most part you should just leave this at the default.
pub backend_module: crate::backend::BackendModule,

/// The backend module config to use.
/// For the most part you should just leave this set at `None`,
/// to get the default backend config.
pub backend_module_config: Option<serde_json::Value>,
}

impl std::fmt::Debug for Config {
Expand Down Expand Up @@ -81,6 +91,8 @@ impl Default for Config {
backoff_start: std::time::Duration::from_secs(5),
backoff_max: std::time::Duration::from_secs(60),
preflight: None,
backend_module: crate::backend::BackendModule::default(),
backend_module_config: None,
}
}
}
15 changes: 2 additions & 13 deletions crates/tx5/src/ep.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ impl EndpointRecv {
pub(crate) struct EpInner {
this: Weak<Mutex<EpInner>>,
config: Arc<Config>,
webrtc_config: Vec<u8>,
recv_limit: Arc<tokio::sync::Semaphore>,
evt_send: tokio::sync::mpsc::Sender<EndpointEvent>,
sig_map: HashMap<SigUrl, Arc<Sig>>,
Expand Down Expand Up @@ -117,7 +116,6 @@ impl EpInner {
Sig::new(
self.this.clone(),
self.config.clone(),
self.webrtc_config.clone(),
sig_url,
listener,
self.evt_send.clone(),
Expand Down Expand Up @@ -150,20 +148,14 @@ impl EpInner {
.clone()
}

pub fn accept_peer(
&mut self,
peer_url: PeerUrl,
conn: Arc<tx5_connection::Conn>,
conn_recv: tx5_connection::ConnRecv,
) {
pub fn accept_peer(&mut self, peer_url: PeerUrl, wc: DynBackWaitCon) {
self.peer_map.entry(peer_url.clone()).or_insert_with(|| {
Peer::new_accept(
self.config.clone(),
self.recv_limit.clone(),
self.this.clone(),
peer_url,
conn,
conn_recv,
wc,
self.evt_send.clone(),
)
});
Expand Down Expand Up @@ -201,12 +193,9 @@ impl Endpoint {
Self {
config: config.clone(),
inner: Arc::new_cyclic(|this| {
let webrtc_config =
config.initial_webrtc_config.as_bytes().to_vec();
Mutex::new(EpInner {
this: this.clone(),
config,
webrtc_config,
recv_limit,
evt_send,
sig_map: HashMap::default(),
Expand Down
3 changes: 3 additions & 0 deletions crates/tx5/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ pub type PreflightCheckCb = Arc<
+ Sync,
>;

pub mod backend;
use backend::*;

mod config;
pub use config::*;

Expand Down
Loading

0 comments on commit 60c2321

Please sign in to comment.