Skip to content

Commit

Permalink
integrate api changes into tx5 high level
Browse files Browse the repository at this point in the history
  • Loading branch information
neonphog committed Nov 28, 2023
1 parent 668b64e commit e01a1f6
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 111 deletions.
10 changes: 7 additions & 3 deletions crates/tx5-go-pion/src/data_chan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ impl WeakDataChan {
if let Ok(core) = &mut *lock {
core.close(err.clone());
}
*lock = Err(err.into());
*lock = Err(err);
}
}

Expand All @@ -78,7 +78,9 @@ impl WeakDataChan {
}

pub async fn send_evt(&self, evt: DataChannelEvent) -> Result<()> {
data_chan_weak_core!(self.0, core, { core.evt_send.send(evt).await })
data_chan_weak_core!(self.0, core, { Ok(core.evt_send.clone()) })?
.send(evt)
.await
}
}

Expand Down Expand Up @@ -116,6 +118,8 @@ impl DataChannel {
if let Ok(ready_state) =
unsafe { API.data_chan_ready_state(data_chan_id) }
{
// this is a terrible suggestion clippy
#[allow(clippy::comparison_chain)]
if ready_state == 2 {
let _ = evt_send.send.send((DataChannelEvent::Open, None));
} else if ready_state > 2 {
Expand All @@ -132,7 +136,7 @@ impl DataChannel {
if let Ok(core) = &mut *lock {
core.close(err.clone());
}
*lock = Err(err.into());
*lock = Err(err);
}

fn get_data_chan_id(&self) -> Result<usize> {
Expand Down
2 changes: 1 addition & 1 deletion crates/tx5-go-pion/src/evt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ static MANAGER: Lazy<Mutex<Manager>> = Lazy::new(|| {
manager_access!(data_chan_id, runtime, data_chan, async {
let len = buf.len()?;
if len > 16 * 1024 {
return Err(Error::id("MsgTooLarge").into());
return Err(Error::id("MsgTooLarge"));
}

let recv_limit = data_chan.get_recv_limit()?;
Expand Down
8 changes: 5 additions & 3 deletions crates/tx5-go-pion/src/peer_con.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ impl WeakPeerCon {
if let Ok(core) = &mut *lock {
core.close(err.clone());
}
*lock = Err(err.into());
*lock = Err(err);
}
}

Expand All @@ -169,7 +169,9 @@ impl WeakPeerCon {
}

pub async fn send_evt(&self, evt: PeerConnectionEvent) -> Result<()> {
peer_con_weak_core!(self.0, core, { core.evt_send.send(evt).await })
peer_con_weak_core!(self.0, core, { Ok(core.evt_send.clone()) })?
.send(evt)
.await
}
}

Expand Down Expand Up @@ -213,7 +215,7 @@ impl PeerConnection {
if let Ok(core) = &mut *lock {
core.close(err.clone());
}
*lock = Err(err.into());
*lock = Err(err);
}

fn get_peer_con_id(&self) -> Result<usize> {
Expand Down
6 changes: 1 addition & 5 deletions crates/tx5/examples/turn_doctor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,12 +331,8 @@ async fn gather_ice(
let _ = tokio::time::timeout(
std::time::Duration::from_secs(10),
async move {
let (s, mut r) = tokio::sync::mpsc::unbounded_channel();
let mut con = tx5_go_pion::PeerConnection::new(
let (mut con, mut r) = tx5_go_pion::PeerConnection::new(
config,
move |evt| {
let _ = s.send(evt);
},
std::sync::Arc::new(tokio::sync::Semaphore::new(
usize::MAX >> 3,
)),
Expand Down
175 changes: 76 additions & 99 deletions crates/tx5/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -362,33 +362,16 @@ async fn new_conn_task(
use tx5_go_pion::PeerConnectionEvent as PeerEvt;
use tx5_go_pion::PeerConnectionState as PeerState;

enum MultiEvt {
OneSec,
Stats(
tokio::sync::oneshot::Sender<
Option<HashMap<String, BackendMetrics>>,
>,
),
Peer(PeerEvt),
Data(DataEvt),
}

let (peer_snd, mut peer_rcv) = tokio::sync::mpsc::unbounded_channel();

let peer_snd2 = peer_snd.clone();
let mut peer = match async {
let (mut peer, mut peer_recv) = match async {
let peer_config = BackBuf::from_json(ice_servers)?;

let peer = tx5_go_pion::PeerConnection::new(
let (peer, peer_recv) = tx5_go_pion::PeerConnection::new(
peer_config.imp.buf,
move |evt| {
let _ = peer_snd2.send(MultiEvt::Peer(evt));
},
seed.rcv_limit().clone(),
)
.await?;

Result::Ok(peer)
Result::Ok((peer, peer_recv))
}
.await
{
Expand Down Expand Up @@ -419,37 +402,8 @@ async fn new_conn_task(
}
}

let peer_snd_task = peer_snd.clone();
tokio::task::spawn(async move {
loop {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
if peer_snd_task.send(MultiEvt::OneSec).is_err() {
break;
}
}
});

let slot: Arc<std::sync::Mutex<Option<HashMap<String, BackendMetrics>>>> =
Arc::new(std::sync::Mutex::new(None));
let weak_slot = Arc::downgrade(&slot);
let peer_snd_task = peer_snd.clone();
tokio::task::spawn(async move {
loop {
tokio::time::sleep(std::time::Duration::from_secs(5)).await;

if let Some(slot) = weak_slot.upgrade() {
let (s, r) = tokio::sync::oneshot::channel();
if peer_snd_task.send(MultiEvt::Stats(s)).is_err() {
break;
}
if let Ok(stats) = r.await {
*slot.lock().unwrap() = stats;
}
} else {
break;
}
}
});

let weak_slot = Arc::downgrade(&slot);
let _unregister = {
Expand Down Expand Up @@ -541,6 +495,9 @@ async fn new_conn_task(
};

let mut data_chan: Option<tx5_go_pion::DataChannel> = None;
let mut data_chan_recv: Option<
tx5_go_pion::EventRecv<tx5_go_pion::DataChannelEvent>,
> = None;
let mut data_chan_ready = false;

let mut check_data_chan_ready =
Expand All @@ -565,41 +522,82 @@ async fn new_conn_task(

tracing::debug!(?conn_uniq, "PEER CON OPEN");

let mut one_sec = tokio::time::interval(std::time::Duration::from_secs(1));
one_sec.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
let mut five_sec = tokio::time::interval(std::time::Duration::from_secs(5));
five_sec.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);

loop {
tokio::select! {
msg = peer_rcv.recv() => {
_ = one_sec.tick() => {
if let Some(data_chan) = data_chan.as_mut() {
if let Ok(buf) = data_chan.buffered_amount() {
if buf <= config.per_data_chan_buf_low() {
conn_state.check_send_waiting(Some(state::BufState::Low)).await;
}
}
}
}
_ = five_sec.tick() => {
if let Ok(mut buf) = peer.stats().await.map(BackBuf::from_raw) {
if let Ok(val) = buf.to_json() {
*slot.lock().unwrap() = Some(val);
} else {
*slot.lock().unwrap() = None;
}
} else {
*slot.lock().unwrap() = None;
}
}
msg = async {
if let Some(data_chan_recv) = &mut data_chan_recv {
data_chan_recv.recv().await
} else {
std::future::pending().await
}
} => {
match msg {
None => {
conn_state.close(Error::id("PeerConClosed"));
break;
}
Some(MultiEvt::OneSec) => {
if let Some(data_chan) = data_chan.as_mut() {
if let Ok(buf) = data_chan.buffered_amount() {
if buf <= config.per_data_chan_buf_low() {
conn_state.check_send_waiting(Some(state::BufState::Low)).await;
}
}
Some(DataEvt::Error(err)) => {
tracing::warn!(?err, "ConnectionError");
conn_state.close(err.into());
break;
}
Some(DataEvt::Open) => {
if check_data_chan_ready(&mut data_chan).is_err() {
break;
}
}
Some(MultiEvt::Stats(resp)) => {
if let Ok(mut buf) = peer.stats().await.map(BackBuf::from_raw) {
if let Ok(val) = buf.to_json() {
let _ = resp.send(Some(val));
} else {
let _ = resp.send(None);
}
} else {
let _ = resp.send(None);
Some(DataEvt::Close) => {
conn_state.close(Error::id("DataChanClosed"));
break;
}
Some(DataEvt::Message(buf, permit)) => {
if conn_state.rcv_data(BackBuf::from_raw(buf), permit).is_err() {
break;
}
}
Some(MultiEvt::Peer(PeerEvt::Error(err)))
| Some(MultiEvt::Data(DataEvt::Error(err))) => {
Some(DataEvt::BufferedAmountLow) => {
tracing::debug!(?conn_uniq, "BufferedAmountLow");
conn_state.check_send_waiting(Some(state::BufState::Low)).await;
}
}
}
msg = peer_recv.recv() => {
match msg {
None => {
conn_state.close(Error::id("PeerConClosed"));
break;
}
Some(PeerEvt::Error(err)) => {
tracing::warn!(?err, "ConnectionError");
conn_state.close(err);
conn_state.close(err.into());
break;
}
Some(MultiEvt::Peer(PeerEvt::State(peer_state))) => {
Some(PeerEvt::State(peer_state)) => {
match peer_state {
PeerState::New
| PeerState::Connecting
Expand All @@ -614,57 +612,36 @@ async fn new_conn_task(
}
}
}
Some(MultiEvt::Peer(PeerEvt::ICECandidate(buf))) => {
Some(PeerEvt::ICECandidate(buf)) => {
let buf = BackBuf::from_raw(buf);
if conn_state.ice(buf).is_err() {
break;
}
}
Some(MultiEvt::Peer(PeerEvt::DataChannel(chan))) => {
let peer_snd = peer_snd.clone();
data_chan = Some(chan.handle(move |evt| {
let _ = peer_snd.send(MultiEvt::Data(evt));
}));
if check_data_chan_ready(&mut data_chan).is_err() {
break;
}
}
Some(MultiEvt::Data(DataEvt::Open)) => {
Some(PeerEvt::DataChannel(chan, chan_recv)) => {
data_chan = Some(chan);
data_chan_recv = Some(chan_recv);
if check_data_chan_ready(&mut data_chan).is_err() {
break;
}
}
Some(MultiEvt::Data(DataEvt::Close)) => {
conn_state.close(Error::id("DataChanClosed"));
break;
}
Some(MultiEvt::Data(DataEvt::Message(buf, permit))) => {
if conn_state.rcv_data(BackBuf::from_raw(buf), permit).is_err() {
break;
}
}
Some(MultiEvt::Data(DataEvt::BufferedAmountLow)) => {
tracing::debug!(?conn_uniq, "BufferedAmountLow");
conn_state.check_send_waiting(Some(state::BufState::Low)).await;
}
}
}
msg = conn_evt.recv() => {
match msg {
Some(Ok(state::ConnStateEvt::CreateOffer(mut resp))) => {
let peer = &mut peer;
let data_chan_w = &mut data_chan;
let peer_snd = peer_snd.clone();
let data_chan_recv_w = &mut data_chan_recv;
resp.with(move || async move {
let chan = peer.create_data_channel(
let (chan, chan_recv) = peer.create_data_channel(
tx5_go_pion::DataChannelConfig {
label: Some("data".into()),
}
).await?;

*data_chan_w = Some(chan.handle(move |evt| {
let _ = peer_snd.send(MultiEvt::Data(evt));
}));
*data_chan_w = Some(chan);
*data_chan_recv_w = Some(chan_recv);

let mut buf = peer.create_offer(
tx5_go_pion::OfferConfig::default(),
Expand Down

0 comments on commit e01a1f6

Please sign in to comment.