Skip to content

Commit

Permalink
feat: multicast
Browse files Browse the repository at this point in the history
  • Loading branch information
decahedron1 committed Jul 4, 2024
1 parent 48fd04f commit 22f76d0
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 4 deletions.
7 changes: 4 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@ chrono = { version = "0.4", default-features = false, features = [ "clock", "std
serde = { version = "1.0", optional = true, features = [ "derive" ] }
serde-aux = { version = "4.4", optional = true }
uuid = { version = "1.5", optional = true }
reqwest = { version = "0.12", optional = true }
reqwest = { version = "0.12", default-features = false, optional = true, features = [ "charset", "http2" ] }
simd-json = { version = "0.13", optional = true }
url = { version = "2.5", optional = true }
rand = { version = "0.8", optional = true }
regex = { version = "1.10", optional = true }
async-stream = "0.3"
pin-project-lite = "0.2"

[dev-dependencies]
anyhow = "1.0"
Expand All @@ -33,5 +34,5 @@ default = [ "tls-native", "twitch", "youtube" ]
twitch = [ "dep:irc", "dep:uuid" ]
youtube = [ "dep:simd-json", "dep:reqwest", "dep:rand", "dep:serde", "dep:url", "dep:regex", "dep:serde-aux" ]
serde = [ "dep:serde", "chrono/serde", "uuid?/serde" ]
tls-native = [ "irc?/tls-native" ]
tls-rust = [ "irc?/tls-rust" ]
tls-native = [ "irc?/tls-native", "reqwest/native-tls" ]
tls-rust = [ "irc?/tls-rust", "reqwest/rustls-tls" ]
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ A live chat interface for Twitch & YouTube written in Rust.
* ⚡ Low latency
* ⏪ Supports VODs
* 🔓 No authentication required
- 🤝 **Simulcast** - Receive from multiple streams & platforms simultaneously

## Usage
See [`examples/twitch.rs`](https://github.com/vitri-ent/brainrot/blob/main/examples/twitch.rs) & [`examples/youtube.rs`](https://github.com/vitri-ent/brainrot/blob/main/examples/youtube.rs).
Expand Down
5 changes: 5 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,9 @@ pub use self::twitch::{Chat as TwitchChat, ChatEvent as TwitchChatEvent, Message
#[cfg(feature = "youtube")]
pub mod youtube;

#[cfg(all(feature = "twitch", feature = "youtube"))]
mod multicast;
#[cfg(all(feature = "twitch", feature = "youtube"))]
pub use self::multicast::{Multicast, MulticastError, VariantChat};

pub(crate) mod util;
115 changes: 115 additions & 0 deletions src/multicast.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// Copyright 2024 pyke.io
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::{pin::Pin, task::Poll};

use futures_util::Stream;
use pin_project_lite::pin_project;
use thiserror::Error;

use crate::{twitch, youtube};

#[derive(Debug, Error)]
pub enum MulticastError {
#[error("{0}")]
TwitchError(irc::error::Error),
#[error("{0}")]
YouTubeError(youtube::Error)
}

#[derive(Debug)]
pub enum VariantChat {
Twitch(twitch::ChatEvent),
YouTube(youtube::Action)
}

pin_project! {
#[project = VariantStreamProject]
enum VariantStream<'a> {
Twitch { #[pin] x: crate::twitch::Chat },
YouTube { #[pin] x: Pin<Box<dyn Stream<Item = Result<youtube::Action, youtube::Error>> + 'a>> }
}
}

impl<'a> Stream for VariantStream<'a> {
type Item = Result<VariantChat, MulticastError>;

fn poll_next(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Option<Self::Item>> {
match self.project() {
VariantStreamProject::YouTube { x } => {
Poll::Ready(futures_util::ready!(x.poll_next(cx)).map(|x| x.map(|c| VariantChat::YouTube(c)).map_err(MulticastError::YouTubeError)))
}
VariantStreamProject::Twitch { x } => {
Poll::Ready(futures_util::ready!(x.poll_next(cx)).map(|x| x.map(|c| VariantChat::Twitch(c)).map_err(MulticastError::TwitchError)))
}
}
}
}

impl<'a> From<crate::twitch::Chat> for VariantStream<'a> {
fn from(value: crate::twitch::Chat) -> Self {
Self::Twitch { x: value }
}
}

impl<'a> From<Pin<Box<dyn Stream<Item = Result<youtube::Action, youtube::Error>> + 'a>>> for VariantStream<'a> {
fn from(value: Pin<Box<dyn Stream<Item = Result<youtube::Action, youtube::Error>> + 'a>>) -> Self {
Self::YouTube { x: value }
}
}

pin_project! {
pub struct Multicast<'a> {
#[pin]
streams: Vec<VariantStream<'a>>
}
}

impl<'a> Multicast<'a> {
pub fn new() -> Self {
Self { streams: vec![] }
}

pub fn push<'b: 'a>(&mut self, stream: impl Into<VariantStream<'b>>) {

Check warning on line 84 in src/multicast.rs

View workflow job for this annotation

GitHub Actions / Build and test (ubuntu-latest, stable)

type `VariantStream<'b>` is more private than the item `Multicast::<'a>::push`

Check warning on line 84 in src/multicast.rs

View workflow job for this annotation

GitHub Actions / Build and test (ubuntu-latest, stable)

type `VariantStream<'b>` is more private than the item `Multicast::<'a>::push`

Check warning on line 84 in src/multicast.rs

View workflow job for this annotation

GitHub Actions / Build and test (ubuntu-latest, stable)

type `VariantStream<'b>` is more private than the item `Multicast::<'a>::push`

Check warning on line 84 in src/multicast.rs

View workflow job for this annotation

GitHub Actions / Build and test (windows-latest, stable)

type `VariantStream<'b>` is more private than the item `Multicast::<'a>::push`

Check warning on line 84 in src/multicast.rs

View workflow job for this annotation

GitHub Actions / Build and test (windows-latest, stable)

type `VariantStream<'b>` is more private than the item `Multicast::<'a>::push`

Check warning on line 84 in src/multicast.rs

View workflow job for this annotation

GitHub Actions / Build and test (windows-latest, stable)

type `VariantStream<'b>` is more private than the item `Multicast::<'a>::push`

Check warning on line 84 in src/multicast.rs

View workflow job for this annotation

GitHub Actions / Build and test (macos-latest, stable)

type `VariantStream<'b>` is more private than the item `Multicast::<'a>::push`

Check warning on line 84 in src/multicast.rs

View workflow job for this annotation

GitHub Actions / Build and test (macos-latest, stable)

type `VariantStream<'b>` is more private than the item `Multicast::<'a>::push`

Check warning on line 84 in src/multicast.rs

View workflow job for this annotation

GitHub Actions / Build and test (macos-latest, stable)

type `VariantStream<'b>` is more private than the item `Multicast::<'a>::push`
self.streams.push(stream.into());
}

pub async fn push_twitch(&mut self, channel: &str, auth: impl twitch::TwitchIdentity) -> Result<(), irc::error::Error> {
self.push(twitch::Chat::new(channel, auth).await?);
Ok(())
}

pub async fn push_youtube<'b: 'a>(&mut self, context: &'b youtube::ChatContext) -> Result<(), youtube::Error> {
self.push(youtube::stream(context).await?);
Ok(())
}
}

impl<'a> Stream for Multicast<'a> {
type Item = Result<VariantChat, MulticastError>;

fn poll_next(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Option<Self::Item>> {
let mut this = self.project();
let mut res = Poll::Ready(None);
for i in 0..this.streams.len() {
let stream = unsafe { Pin::new_unchecked(this.streams.as_mut().get_unchecked_mut().get_mut(i).unwrap()) };
match stream.poll_next(cx) {
Poll::Ready(Some(item)) => return Poll::Ready(Some(item)),
Poll::Ready(None) => continue,
Poll::Pending => res = Poll::Pending
}
}
res
}
}
2 changes: 1 addition & 1 deletion src/youtube/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ impl<'r> IntoIterator for ActionChunk<'r> {
}
}

pub async fn stream(options: &ChatContext) -> Result<Pin<Box<impl Stream<Item = Result<Action, Error>> + '_>>, Error> {
pub async fn stream(options: &ChatContext) -> Result<Pin<Box<dyn Stream<Item = Result<Action, Error>> + '_>>, Error> {
let initial_chat = GetLiveChatResponse::fetch(options, &options.initial_continuation).await?;

let (mut yield_tx, yield_rx) = unsafe { async_stream::__private::yielder::pair() };
Expand Down

0 comments on commit 22f76d0

Please sign in to comment.