Skip to content

Commit

Permalink
checkpoint, need to cleanup conns on errors
Browse files Browse the repository at this point in the history
  • Loading branch information
neonphog committed Jan 25, 2024
1 parent b0b48f3 commit 6e03ebc
Show file tree
Hide file tree
Showing 6 changed files with 542 additions and 447 deletions.
206 changes: 113 additions & 93 deletions crates/tx5-go-pion/src/evt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,109 +158,129 @@ macro_rules! manager_access {
($id:ident, $rt:ident, $map:ident, $code:expr) => {
let $map = MANAGER.lock().unwrap().$map.get(&$id).cloned();
if let Some($map) = &$map {
if let Err(err) = $rt.block_on(async move {
tokio::time::timeout(
std::time::Duration::from_millis(18),
$code,
)
.await
.map_err(|_| Error::id("AppSlow"))?
}) {
$map.close(err.into());
match tokio::time::timeout(
std::time::Duration::from_millis(18),
$code,
)
.await
.map_err(|_| Error::id("AppSlow"))
{
Err(err) | Ok(Err(err)) => {
$map.close(err.into());
}
Ok(_) => (),
}
}
};
}

static MANAGER: Lazy<Mutex<Manager>> = Lazy::new(|| {
let runtime = tokio::runtime::Handle::current();
unsafe {
API.on_event(move |sys_evt| match sys_evt {
SysEvent::Error(_error) => (),
SysEvent::PeerConICECandidate {
peer_con_id,
candidate,
} => {
manager_access!(peer_con_id, runtime, peer_con, {
peer_con.send_evt(PeerConnectionEvent::ICECandidate(GoBuf(
candidate,
)))
});
}
SysEvent::PeerConStateChange {
peer_con_id,
peer_con_state,
} => {
manager_access!(peer_con_id, runtime, peer_con, {
peer_con.send_evt(PeerConnectionEvent::State(
PeerConnectionState::from_raw(peer_con_state),
))
});
}
SysEvent::PeerConDataChan {
peer_con_id,
data_chan_id,
} => {
manager_access!(peer_con_id, runtime, peer_con, async {
let recv_limit = match peer_con.get_recv_limit() {
Ok(recv_limit) => recv_limit,
Err(err) => {
API.data_chan_free(data_chan_id);
return Err(err);
}
};
// We don't want this channel to represent a significant
// memory buffer, but we also don't want it to be so small
// that it causes thread thrashing. Try 128 to start??
// max msg size is 16 KiB, so 16 * 128 = 2 MiB.
let (send, mut recv) = tokio::sync::mpsc::channel(128);

let (chan, recv) =
DataChannel::new(data_chan_id, recv_limit);
unsafe {
API.on_event(move |sys_evt| {
let _ = send.blocking_send(sys_evt);
});
}

peer_con
.send_evt(PeerConnectionEvent::DataChannel(chan, recv))
.await
});
}
SysEvent::DataChanClose(data_chan_id) => {
manager_access!(data_chan_id, runtime, data_chan, {
data_chan.send_evt(DataChannelEvent::Close)
});
}
SysEvent::DataChanOpen(data_chan_id) => {
manager_access!(data_chan_id, runtime, data_chan, {
data_chan.send_evt(DataChannelEvent::Open)
});
}
SysEvent::DataChanMessage {
data_chan_id,
buffer_id,
} => {
let mut buf = GoBuf(buffer_id);
manager_access!(data_chan_id, runtime, data_chan, async {
let len = buf.len()?;
if len > 16 * 1024 {
return Err(Error::id("MsgTooLarge"));
}

let recv_limit = data_chan.get_recv_limit()?;

let permit = recv_limit
.acquire_many_owned(len as u32)
.await
.map_err(|_| {
Error::from(Error::id(
"DataChanMessageSemaphoreClosed",
tokio::task::spawn(async move {
while let Some(sys_evt) = recv.recv().await {
match sys_evt {
SysEvent::Error(_error) => (),
SysEvent::PeerConICECandidate {
peer_con_id,
candidate,
} => {
manager_access!(peer_con_id, runtime, peer_con, {
peer_con.send_evt(PeerConnectionEvent::ICECandidate(
GoBuf(candidate),
))
});
}
SysEvent::PeerConStateChange {
peer_con_id,
peer_con_state,
} => {
manager_access!(peer_con_id, runtime, peer_con, {
peer_con.send_evt(PeerConnectionEvent::State(
PeerConnectionState::from_raw(peer_con_state),
))
});
}
SysEvent::PeerConDataChan {
peer_con_id,
data_chan_id,
} => {
manager_access!(peer_con_id, runtime, peer_con, async {
let recv_limit = match peer_con.get_recv_limit() {
Ok(recv_limit) => recv_limit,
Err(err) => {
unsafe {
API.data_chan_free(data_chan_id);
}
return Err(err);
}
};

let (chan, recv) =
DataChannel::new(data_chan_id, recv_limit);

peer_con
.send_evt(PeerConnectionEvent::DataChannel(
chan, recv,
))
})?;
.await
});
}
SysEvent::DataChanClose(data_chan_id) => {
manager_access!(data_chan_id, runtime, data_chan, {
data_chan.send_evt(DataChannelEvent::Close)
});
}
SysEvent::DataChanOpen(data_chan_id) => {
manager_access!(data_chan_id, runtime, data_chan, {
data_chan.send_evt(DataChannelEvent::Open)
});
}
SysEvent::DataChanMessage {
data_chan_id,
buffer_id,
} => {
let mut buf = GoBuf(buffer_id);
manager_access!(data_chan_id, runtime, data_chan, async {
let len = buf.len()?;
if len > 16 * 1024 {
return Err(Error::id("MsgTooLarge"));
}

data_chan
.send_evt(DataChannelEvent::Message(buf, permit))
.await
});
}
SysEvent::DataChanBufferedAmountLow(data_chan_id) => {
manager_access!(data_chan_id, runtime, data_chan, {
data_chan.send_evt(DataChannelEvent::BufferedAmountLow)
});
let recv_limit = data_chan.get_recv_limit()?;

let permit = recv_limit
.acquire_many_owned(len as u32)
.await
.map_err(|_| {
Error::from(Error::id(
"DataChanMessageSemaphoreClosed",
))
})?;

data_chan
.send_evt(DataChannelEvent::Message(buf, permit))
.await
});
}
SysEvent::DataChanBufferedAmountLow(data_chan_id) => {
manager_access!(data_chan_id, runtime, data_chan, {
data_chan.send_evt(DataChannelEvent::BufferedAmountLow)
});
}
}
});
}
}
});

Manager::new()
});
102 changes: 2 additions & 100 deletions crates/tx5/src/back_buf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,60 +8,9 @@ pub(crate) mod imp {
pub use imp_go_pion::*;
}

/// Tx5 buffer creation type via std::io::Write.
pub struct BackBufWriter {
imp: imp::ImpWriter,
_not_sync: std::marker::PhantomData<std::cell::Cell<()>>,
}

impl BackBufWriter {
/// Create a new Tx5 buffer writer.
#[inline]
pub fn new() -> Result<Self> {
Ok(Self {
imp: imp::ImpWriter::new()?,
_not_sync: std::marker::PhantomData,
})
}

/// Indicate we are done writing, and extract the internal buffer.
#[inline]
pub fn finish(self) -> BackBuf {
BackBuf {
imp: self.imp.finish(),
_not_sync: std::marker::PhantomData,
}
}
}

impl std::io::Write for BackBufWriter {
#[inline]
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
self.imp.write(buf)
}

#[inline]
fn write_vectored(
&mut self,
bufs: &[std::io::IoSlice<'_>],
) -> std::io::Result<usize> {
self.imp.write_vectored(bufs)
}

#[inline]
fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
self.imp.write_all(buf)
}

#[inline]
fn flush(&mut self) -> std::io::Result<()> {
self.imp.flush()
}
}

/// Tx5 buffer type for sending and receiving data.
#[allow(clippy::len_without_is_empty)]
pub struct BackBuf {
pub(crate) struct BackBuf {
pub(crate) imp: imp::Imp,
pub(crate) _not_sync: std::marker::PhantomData<std::cell::Cell<()>>,
}
Expand Down Expand Up @@ -89,12 +38,6 @@ impl BackBuf {
})
}

/// Build a tx5 buffer using std::io::Write.
#[inline]
pub fn from_writer() -> Result<BackBufWriter> {
BackBufWriter::new()
}

/// Serialize a type as json into a new BackBuf.
#[inline]
pub fn from_json<S: serde::Serialize>(s: S) -> Result<Self> {
Expand All @@ -109,31 +52,6 @@ impl BackBuf {
pub fn len(&mut self) -> Result<usize> {
self.imp.len()
}

/// Attempt to clone this buffer.
#[inline]
pub fn try_clone(&mut self) -> Result<Self> {
Ok(Self {
imp: self.imp.try_clone()?,
_not_sync: std::marker::PhantomData,
})
}

/// Copy the buffer out into a rust `Vec<u8>`.
#[inline]
pub fn to_vec(&mut self) -> Result<Vec<u8>> {
self.imp.to_vec()
}

/// Deserialize this buffer as json bytes
/// into a type implementing serde::DeserializeOwned.
#[inline]
pub fn to_json<D>(&mut self) -> Result<D>
where
D: serde::de::DeserializeOwned + Sized,
{
self.imp.to_json()
}
}

impl std::io::Read for BackBuf {
Expand All @@ -144,30 +62,14 @@ impl std::io::Read for BackBuf {
}

/// Conversion type facilitating Into<&mut BackBuf>.
pub enum BackBufRef<'lt> {
pub(crate) enum BackBufRef<'lt> {
/// An owned BackBuf.
Owned(Result<BackBuf>),

/// A borrowed BackBuf.
Borrowed(Result<&'lt mut BackBuf>),
}

impl<'lt> BackBufRef<'lt> {
/// Get a mutable reference to the buffer.
pub fn as_mut_ref(&'lt mut self) -> Result<&'lt mut BackBuf> {
match self {
BackBufRef::Owned(o) => match o {
Ok(o) => Ok(o),
Err(e) => Err(e.err_clone()),
},
BackBufRef::Borrowed(b) => match b {
Ok(b) => Ok(b),
Err(e) => Err(e.err_clone()),
},
}
}
}

impl From<BackBuf> for BackBufRef<'static> {
fn from(b: BackBuf) -> Self {
Self::Owned(Ok(b))
Expand Down
Loading

0 comments on commit 6e03ebc

Please sign in to comment.