From 22f76d04957cbdfab0df5060803e0b58e7b93b08 Mon Sep 17 00:00:00 2001 From: "Carson M." Date: Thu, 4 Jul 2024 15:24:26 -0500 Subject: [PATCH] feat: multicast --- Cargo.toml | 7 +-- README.md | 1 + src/lib.rs | 5 ++ src/multicast.rs | 115 +++++++++++++++++++++++++++++++++++++++++++++ src/youtube/mod.rs | 2 +- 5 files changed, 126 insertions(+), 4 deletions(-) create mode 100644 src/multicast.rs diff --git a/Cargo.toml b/Cargo.toml index f6f06db..0a46c64 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,12 +17,13 @@ chrono = { version = "0.4", default-features = false, features = [ "clock", "std serde = { version = "1.0", optional = true, features = [ "derive" ] } serde-aux = { version = "4.4", optional = true } uuid = { version = "1.5", optional = true } -reqwest = { version = "0.12", optional = true } +reqwest = { version = "0.12", default-features = false, optional = true, features = [ "charset", "http2" ] } simd-json = { version = "0.13", optional = true } url = { version = "2.5", optional = true } rand = { version = "0.8", optional = true } regex = { version = "1.10", optional = true } async-stream = "0.3" +pin-project-lite = "0.2" [dev-dependencies] anyhow = "1.0" @@ -33,5 +34,5 @@ default = [ "tls-native", "twitch", "youtube" ] twitch = [ "dep:irc", "dep:uuid" ] youtube = [ "dep:simd-json", "dep:reqwest", "dep:rand", "dep:serde", "dep:url", "dep:regex", "dep:serde-aux" ] serde = [ "dep:serde", "chrono/serde", "uuid?/serde" ] -tls-native = [ "irc?/tls-native" ] -tls-rust = [ "irc?/tls-rust" ] +tls-native = [ "irc?/tls-native", "reqwest/native-tls" ] +tls-rust = [ "irc?/tls-rust", "reqwest/rustls-tls" ] diff --git a/README.md b/README.md index ce611e4..20f6b66 100644 --- a/README.md +++ b/README.md @@ -10,6 +10,7 @@ A live chat interface for Twitch & YouTube written in Rust. * ⚡ Low latency * ⏪ Supports VODs * 🔓 No authentication required +- 🤝 **Simulcast** - Receive from multiple streams & platforms simultaneously ## Usage See [`examples/twitch.rs`](https://github.com/vitri-ent/brainrot/blob/main/examples/twitch.rs) & [`examples/youtube.rs`](https://github.com/vitri-ent/brainrot/blob/main/examples/youtube.rs). diff --git a/src/lib.rs b/src/lib.rs index 801f754..82676b3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -20,4 +20,9 @@ pub use self::twitch::{Chat as TwitchChat, ChatEvent as TwitchChatEvent, Message #[cfg(feature = "youtube")] pub mod youtube; +#[cfg(all(feature = "twitch", feature = "youtube"))] +mod multicast; +#[cfg(all(feature = "twitch", feature = "youtube"))] +pub use self::multicast::{Multicast, MulticastError, VariantChat}; + pub(crate) mod util; diff --git a/src/multicast.rs b/src/multicast.rs new file mode 100644 index 0000000..c853a7a --- /dev/null +++ b/src/multicast.rs @@ -0,0 +1,115 @@ +// Copyright 2024 pyke.io +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::{pin::Pin, task::Poll}; + +use futures_util::Stream; +use pin_project_lite::pin_project; +use thiserror::Error; + +use crate::{twitch, youtube}; + +#[derive(Debug, Error)] +pub enum MulticastError { + #[error("{0}")] + TwitchError(irc::error::Error), + #[error("{0}")] + YouTubeError(youtube::Error) +} + +#[derive(Debug)] +pub enum VariantChat { + Twitch(twitch::ChatEvent), + YouTube(youtube::Action) +} + +pin_project! { + #[project = VariantStreamProject] + enum VariantStream<'a> { + Twitch { #[pin] x: crate::twitch::Chat }, + YouTube { #[pin] x: Pin> + 'a>> } + } +} + +impl<'a> Stream for VariantStream<'a> { + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll> { + match self.project() { + VariantStreamProject::YouTube { x } => { + Poll::Ready(futures_util::ready!(x.poll_next(cx)).map(|x| x.map(|c| VariantChat::YouTube(c)).map_err(MulticastError::YouTubeError))) + } + VariantStreamProject::Twitch { x } => { + Poll::Ready(futures_util::ready!(x.poll_next(cx)).map(|x| x.map(|c| VariantChat::Twitch(c)).map_err(MulticastError::TwitchError))) + } + } + } +} + +impl<'a> From for VariantStream<'a> { + fn from(value: crate::twitch::Chat) -> Self { + Self::Twitch { x: value } + } +} + +impl<'a> From> + 'a>>> for VariantStream<'a> { + fn from(value: Pin> + 'a>>) -> Self { + Self::YouTube { x: value } + } +} + +pin_project! { + pub struct Multicast<'a> { + #[pin] + streams: Vec> + } +} + +impl<'a> Multicast<'a> { + pub fn new() -> Self { + Self { streams: vec![] } + } + + pub fn push<'b: 'a>(&mut self, stream: impl Into>) { + self.streams.push(stream.into()); + } + + pub async fn push_twitch(&mut self, channel: &str, auth: impl twitch::TwitchIdentity) -> Result<(), irc::error::Error> { + self.push(twitch::Chat::new(channel, auth).await?); + Ok(()) + } + + pub async fn push_youtube<'b: 'a>(&mut self, context: &'b youtube::ChatContext) -> Result<(), youtube::Error> { + self.push(youtube::stream(context).await?); + Ok(()) + } +} + +impl<'a> Stream for Multicast<'a> { + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll> { + let mut this = self.project(); + let mut res = Poll::Ready(None); + for i in 0..this.streams.len() { + let stream = unsafe { Pin::new_unchecked(this.streams.as_mut().get_unchecked_mut().get_mut(i).unwrap()) }; + match stream.poll_next(cx) { + Poll::Ready(Some(item)) => return Poll::Ready(Some(item)), + Poll::Ready(None) => continue, + Poll::Pending => res = Poll::Pending + } + } + res + } +} diff --git a/src/youtube/mod.rs b/src/youtube/mod.rs index 89a3380..f98cee1 100644 --- a/src/youtube/mod.rs +++ b/src/youtube/mod.rs @@ -131,7 +131,7 @@ impl<'r> IntoIterator for ActionChunk<'r> { } } -pub async fn stream(options: &ChatContext) -> Result> + '_>>, Error> { +pub async fn stream(options: &ChatContext) -> Result> + '_>>, Error> { let initial_chat = GetLiveChatResponse::fetch(options, &options.initial_continuation).await?; let (mut yield_tx, yield_rx) = unsafe { async_stream::__private::yielder::pair() };