diff --git a/examples/youtube.rs b/examples/youtube.rs index 29b4827..805abba 100644 --- a/examples/youtube.rs +++ b/examples/youtube.rs @@ -1,14 +1,48 @@ -use std::{env::args, future::IntoFuture}; +use std::future::IntoFuture; -use brainrot::youtube; -use futures_util::StreamExt; +use brainrot::youtube::{self, YouTubeChatPageProcessor}; #[tokio::main] async fn main() -> anyhow::Result<()> { - let (options, cont) = youtube::get_options_from_live_page("6DcXroWNDvk").await?; - let initial_chat = youtube::fetch_yt_chat_page(&options, cont).await?; - let subscriber = youtube::SignalerChannel::new_from_cont(&initial_chat).await?; - let (receiver, handle) = subscriber.spawn_event_subscriber().await?; - handle.into_future().await.unwrap(); + let (options, cont) = youtube::get_options_from_live_page("S144F6Cifyc").await?; + let initial_chat = youtube::fetch_yt_chat_page(&options, &cont).await?; + let topic = initial_chat.continuation_contents.as_ref().unwrap().live_chat_continuation.continuations[0] + .invalidation_continuation_data + .as_ref() + .unwrap() + .invalidation_id + .topic + .to_owned(); + let subscriber = youtube::SignalerChannel::new(topic).await?; + let (mut receiver, _handle) = subscriber.spawn_event_subscriber().await?; + tokio::spawn(async move { + let mut processor = YouTubeChatPageProcessor::new(initial_chat, &options).unwrap(); + for msg in &processor { + println!("{}: {}", msg.author.display_name, msg.runs.iter().map(|c| c.to_string()).collect::()); + } + + while receiver.recv().await.is_ok() { + match processor.cont().await { + Some(Ok(s)) => { + processor = s; + for msg in &processor { + println!("{}: {}", msg.author.display_name, msg.runs.iter().map(|c| c.to_string()).collect::()); + } + + subscriber.refresh_topic(processor.signaler_topic.as_ref().unwrap()).await; + } + Some(Err(e)) => { + eprintln!("{e:?}"); + break; + } + None => { + eprintln!("none"); + break; + } + } + } + }); + _handle.into_future().await.unwrap(); + println!("???"); Ok(()) } diff --git a/src/youtube/mod.rs b/src/youtube/mod.rs index 9557fec..9aca8d9 100644 --- a/src/youtube/mod.rs +++ b/src/youtube/mod.rs @@ -1,41 +1,26 @@ -use std::{ - collections::{HashMap, VecDeque}, - io::BufRead, - iter, - sync::{Arc, OnceLock} -}; +use std::{collections::VecDeque, sync::OnceLock}; -use rand::Rng; use regex::Regex; use reqwest::{ header::{self, HeaderMap, HeaderValue}, - Response, StatusCode -}; -use simd_json::{ - base::{ValueAsContainer, ValueAsScalar}, - OwnedValue + StatusCode }; use thiserror::Error; -use tokio::{ - sync::{broadcast, Mutex}, - task::JoinHandle -}; +use tokio::sync::Mutex; use url::Url; +mod signaler; mod types; mod util; +pub use self::signaler::SignalerChannel; use self::{ types::{Action, GetLiveChatBody, GetLiveChatResponse, MessageRun}, util::{SimdJsonRequestBody, SimdJsonResponseBody} }; -const GCM_SIGNALER_SRQE: &str = "https://signaler-pa.youtube.com/punctual/v1/chooseServer"; -const GCM_SIGNALER_PSUB: &str = "https://signaler-pa.youtube.com/punctual/multi-watch/channel"; const TANGO_LIVE_ENDPOINT: &str = "https://www.youtube.com/youtubei/v1/live_chat/get_live_chat"; const TANGO_REPLAY_ENDPOINT: &str = "https://www.youtube.com/youtubei/v1/live_chat/get_live_chat_replay"; -const LIVE_CHAT_BASE_TANGO_KEY: &str = "AIzaSyDZNkyC-AtROwMBpLfevIvqYk-Gfi8ZOeo"; - const USER_AGENT: &str = "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:123.0) Gecko/20100101 Firefox/123.0"; #[derive(Debug, Error)] @@ -90,9 +75,9 @@ pub(crate) fn get_http_client() -> &'static reqwest::Client { #[derive(Clone, Debug)] pub struct RequestOptions { - pub api_key: String, - pub client_version: String, - pub live_status: bool + pub(crate) api_key: String, + pub(crate) client_version: String, + pub(crate) live_status: bool } pub async fn get_options_from_live_page(live_id: impl AsRef) -> Result<(RequestOptions, String), YouTubeError> { @@ -154,32 +139,88 @@ pub struct ChatMessage { pub is_super: bool, pub author: Author, pub timestamp: i64, - pub time_delta: i64 + pub time_delta: Option } pub struct YouTubeChatPageProcessor<'r> { actions: Mutex>, request_options: &'r RequestOptions, - continuation_token: Option + continuation_token: Option, + pub signaler_topic: Option } unsafe impl<'r> Send for YouTubeChatPageProcessor<'r> {} impl<'r> YouTubeChatPageProcessor<'r> { - pub fn new(response: GetLiveChatResponse, request_options: &'r RequestOptions, continuation_token: Option) -> Result { + pub fn new(response: GetLiveChatResponse, request_options: &'r RequestOptions) -> Result { + let continuation_token = if request_options.live_status { + response + .continuation_contents + .as_ref() + .ok_or(YouTubeError::MissingContinuationContents)? + .live_chat_continuation + .continuations[0] + .invalidation_continuation_data + .as_ref() + .map(|x| x.continuation.to_owned()) + } else { + response + .continuation_contents + .as_ref() + .ok_or(YouTubeError::MissingContinuationContents)? + .live_chat_continuation + .continuations[0] + .live_chat_replay_continuation_data + .as_ref() + .map(|x| x.continuation.to_owned()) + }; + let signaler_topic = if request_options.live_status { + Some( + response.continuation_contents.as_ref().unwrap().live_chat_continuation.continuations[0] + .invalidation_continuation_data + .as_ref() + .unwrap() + .invalidation_id + .topic + .to_owned() + ) + } else { + None + }; Ok(Self { - actions: Mutex::new(VecDeque::from( + actions: Mutex::new(VecDeque::from(if request_options.live_status { + response + .continuation_contents + .ok_or(YouTubeError::MissingContinuationContents)? + .live_chat_continuation + .actions + .unwrap_or_default() + } else { response .continuation_contents .ok_or(YouTubeError::MissingContinuationContents)? .live_chat_continuation .actions .ok_or(YouTubeError::EndOfContinuation)? - )), + })), request_options, - continuation_token + continuation_token, + signaler_topic }) } + + async fn next_page(&self, continuation_token: &String) -> Result { + let page = fetch_yt_chat_page(self.request_options, continuation_token).await?; + YouTubeChatPageProcessor::new(page, self.request_options) + } + + pub async fn cont(&self) -> Option> { + if let Some(continuation_token) = &self.continuation_token { + Some(self.next_page(continuation_token).await) + } else { + None + } + } } impl<'r> Iterator for &YouTubeChatPageProcessor<'r> { @@ -199,15 +240,25 @@ impl<'r> Iterator for &YouTubeChatPageProcessor<'r> { if let Some(add_chat_item_action) = action.add_chat_item_action { if let Some(text_message_renderer) = &add_chat_item_action.item.live_chat_text_message_renderer { if text_message_renderer.message.is_some() { - next_action.replace((add_chat_item_action, replay.video_offset_time_msec)); + next_action.replace((add_chat_item_action, Some(replay.video_offset_time_msec))); } } else if let Some(superchat_renderer) = &add_chat_item_action.item.live_chat_paid_message_renderer { if superchat_renderer.live_chat_text_message_renderer.message.is_some() { - next_action.replace((add_chat_item_action, replay.video_offset_time_msec)); + next_action.replace((add_chat_item_action, Some(replay.video_offset_time_msec))); } } } } + } else if let Some(action) = action.add_chat_item_action { + if let Some(text_message_renderer) = &action.item.live_chat_text_message_renderer { + if text_message_renderer.message.is_some() { + next_action.replace((action, None)); + } + } else if let Some(superchat_renderer) = &action.item.live_chat_paid_message_renderer { + if superchat_renderer.live_chat_text_message_renderer.message.is_some() { + next_action.replace((action, None)); + } + } } } None => return None @@ -221,7 +272,7 @@ impl<'r> Iterator for &YouTubeChatPageProcessor<'r> { } else if let Some(renderer) = next_action.item.live_chat_paid_message_renderer { renderer.live_chat_text_message_renderer } else { - panic!() + unimplemented!() }; Some(ChatMessage { @@ -258,215 +309,3 @@ pub async fn fetch_yt_chat_page(options: &RequestOptions, continuation: impl AsR .await?; Ok(response) } - -#[derive(Debug, Default)] -struct SignalerChannelInner { - topic: String, - gsessionid: Option, - sid: Option, - rid: usize, - aid: usize, - session_n: usize -} - -impl SignalerChannelInner { - pub fn with_topic(topic: impl ToString) -> Self { - Self { - topic: topic.to_string(), - ..Default::default() - } - } - - pub fn reset(&mut self) { - self.gsessionid = None; - self.sid = None; - self.rid = 0; - self.aid = 0; - self.session_n = 0; - } - - fn gen_zx() -> String { - const CHARSET: &[u8] = b"abcdefghijklmnopqrstuvwxyz0123456789"; - let mut rng = rand::thread_rng(); - iter::repeat_with(|| CHARSET[rng.gen_range(0..CHARSET.len())] as char).take(11).collect() - } - - pub async fn choose_server(&mut self) -> Result<(), YouTubeError> { - let server_response: OwnedValue = get_http_client() - .post(Url::parse_with_params(GCM_SIGNALER_SRQE, [("key", LIVE_CHAT_BASE_TANGO_KEY)])?) - .header(header::CONTENT_TYPE, "application/json+protobuf") - .body(format!(r#"[[null,null,null,[7,5],null,[["youtube_live_chat_web"],[1],[[["{}"]]]]]]"#, self.topic)) - .send() - .await? - .simd_json() - .await?; - let gsess = server_response.as_array().unwrap()[0].as_str().unwrap(); - self.gsessionid = Some(gsess.to_owned()); - Ok(()) - } - - pub async fn renew_session_or_something(&mut self) -> Result<(), YouTubeError> { - let mut ofs_parameters = HashMap::new(); - ofs_parameters.insert("count", "2".to_string()); - ofs_parameters.insert("ofs", "1".to_string()); - ofs_parameters.insert("req0___data__", format!(r#"[[["{}",null,[]]]]"#, self.session_n)); - self.session_n += 1; - ofs_parameters.insert( - "req1___data__", - format!(r#"[[["{}",[null,null,null,[7,5],null,[["youtube_live_chat_web"],[1],[[["{}"]]]],null,null,1],null,3]]]"#, self.session_n, self.topic) - ); - let ofs = get_http_client() - .post(Url::parse_with_params( - GCM_SIGNALER_PSUB, - [ - ("VER", "8"), - ("gsessionid", self.gsessionid.as_ref().unwrap()), - ("key", LIVE_CHAT_BASE_TANGO_KEY), - ("SID", self.sid.as_ref().unwrap()), - ("RID", &self.rid.to_string()), - ("AID", &self.aid.to_string()), - ("CVER", "22"), - ("zx", Self::gen_zx().as_ref()), - ("t", "1") - ] - )?) - .header("X-WebChannel-Content-Type", "application/json+protobuf") - .form(&ofs_parameters) - .send() - .await?; - - let mut ofs_res_line = ofs.bytes().await?.lines().nth(1).unwrap().unwrap(); - println!("{ofs_res_line}"); - let value: OwnedValue = unsafe { simd_json::from_str(&mut ofs_res_line) }?; - let value = value.as_array().unwrap(); - // assert_eq!(value[0].as_usize().unwrap(), 1); - - Ok(()) - } - - pub async fn init_session(&mut self) -> Result<(), YouTubeError> { - let mut ofs_parameters = HashMap::new(); - ofs_parameters.insert("count", "1".to_string()); - ofs_parameters.insert("ofs", "0".to_string()); - ofs_parameters.insert( - "req0___data__", - format!(r#"[[["1",[null,null,null,[7,5],null,[["youtube_live_chat_web"],[1],[[["{}"]]]],null,null,1],null,3]]]"#, self.topic) - ); - self.session_n = 1; - let ofs = get_http_client() - .post(Url::parse_with_params( - GCM_SIGNALER_PSUB, - [ - ("VER", "8"), - ("gsessionid", self.gsessionid.as_ref().unwrap()), - ("key", LIVE_CHAT_BASE_TANGO_KEY), - ("RID", &self.rid.to_string()), - ("AID", &self.aid.to_string()), - ("CVER", "22"), - ("zx", Self::gen_zx().as_ref()), - ("t", "1") - ] - )?) - .header("X-WebChannel-Content-Type", "application/json+protobuf") - .form(&ofs_parameters) - .send() - .await?; - - let mut ofs_res_line = ofs.bytes().await?.lines().nth(1).unwrap().unwrap(); - let value: OwnedValue = unsafe { simd_json::from_str(&mut ofs_res_line) }?; - let value = value.as_array().unwrap()[0].as_array().unwrap(); - assert_eq!(value[0].as_usize().unwrap(), 0); - let sid = value[1].as_array().unwrap()[1].as_str().unwrap(); - self.sid = Some(sid.to_owned()); - Ok(()) - } - - pub async fn get_session_stream(&self) -> Result { - Ok(get_http_client() - .get(Url::parse_with_params( - GCM_SIGNALER_PSUB, - [ - ("VER", "8"), - ("gsessionid", self.gsessionid.as_ref().unwrap()), - ("key", LIVE_CHAT_BASE_TANGO_KEY), - ("RID", "rpc"), - ("SID", self.sid.as_ref().unwrap()), - ("AID", &self.aid.to_string()), - ("CI", "0"), - ("TYPE", "xmlhttp"), - ("zx", &Self::gen_zx()), - ("t", "1") - ] - )?) - .header(header::CONNECTION, "keep-alive") - .send() - .await?) - } -} - -#[derive(Debug)] -pub struct SignalerChannel { - inner: Arc> -} - -impl SignalerChannel { - pub async fn new(topic_id: impl ToString) -> Result { - Ok(SignalerChannel { - inner: Arc::new(Mutex::new(SignalerChannelInner::with_topic(topic_id))) - }) - } - - pub async fn new_from_cont(cont: &GetLiveChatResponse) -> Result { - Ok(SignalerChannel { - inner: Arc::new(Mutex::new(SignalerChannelInner::with_topic( - &cont.continuation_contents.as_ref().unwrap().live_chat_continuation.continuations[0] - .invalidation_continuation_data - .as_ref() - .unwrap() - .invalidation_id - .topic - ))) - }) - } - - pub async fn spawn_event_subscriber(&self) -> Result<(broadcast::Receiver<()>, JoinHandle<()>), YouTubeError> { - let inner = Arc::clone(&self.inner); - { - let mut lock = inner.lock().await; - lock.choose_server().await?; - lock.init_session().await?; - } - let (sender, receiver) = broadcast::channel(128); - let handle = tokio::spawn(async move { - loop { - let mut req = { - let mut lock = inner.lock().await; - lock.reset(); - lock.choose_server().await.unwrap(); - lock.init_session().await.unwrap(); - lock.get_session_stream().await.unwrap() - }; - loop { - match req.chunk().await { - Ok(None) => break, - Ok(Some(s)) => { - let mut ofs_res_line = s.lines().nth(1).unwrap().unwrap(); - println!("{ofs_res_line}"); - if let Ok(s) = unsafe { simd_json::from_str::(ofs_res_line.as_mut()) } { - let a = s.as_array().unwrap(); - { - inner.lock().await.aid = a[a.len() - 1].as_array().unwrap()[0].as_usize().unwrap(); - } - } - } - Err(e) => { - eprintln!("{e:?}"); - break; - } - } - } - } - }); - Ok((receiver, handle)) - } -} diff --git a/src/youtube/signaler.rs b/src/youtube/signaler.rs new file mode 100644 index 0000000..e89f45e --- /dev/null +++ b/src/youtube/signaler.rs @@ -0,0 +1,201 @@ +use std::{collections::HashMap, io::BufRead, iter, sync::Arc}; + +use rand::Rng; +use reqwest::{header, Response}; +use simd_json::{ + base::{ValueAsContainer, ValueAsScalar}, + OwnedValue +}; +use tokio::{ + sync::{broadcast, Mutex}, + task::JoinHandle +}; +use url::Url; + +use super::{types::GetLiveChatResponse, util::SimdJsonResponseBody, YouTubeError}; + +const GCM_SIGNALER_SRQE: &str = "https://signaler-pa.youtube.com/punctual/v1/chooseServer"; +const GCM_SIGNALER_PSUB: &str = "https://signaler-pa.youtube.com/punctual/multi-watch/channel"; + +const LIVE_CHAT_BASE_TANGO_KEY: &str = "AIzaSyDZNkyC-AtROwMBpLfevIvqYk-Gfi8ZOeo"; + +#[derive(Debug, Default)] +struct SignalerChannelInner { + topic: String, + gsessionid: Option, + sid: Option, + rid: usize, + aid: usize, + session_n: usize +} + +impl SignalerChannelInner { + pub fn with_topic(topic: impl ToString) -> Self { + Self { + topic: topic.to_string(), + ..Default::default() + } + } + + pub fn reset(&mut self) { + self.gsessionid = None; + self.sid = None; + self.rid = 0; + self.aid = 0; + self.session_n = 0; + } + + fn gen_zx() -> String { + const CHARSET: &[u8] = b"abcdefghijklmnopqrstuvwxyz0123456789"; + let mut rng = rand::thread_rng(); + iter::repeat_with(|| CHARSET[rng.gen_range(0..CHARSET.len())] as char).take(11).collect() + } + + pub async fn choose_server(&mut self) -> Result<(), YouTubeError> { + let server_response: OwnedValue = super::get_http_client() + .post(Url::parse_with_params(GCM_SIGNALER_SRQE, [("key", LIVE_CHAT_BASE_TANGO_KEY)])?) + .header(header::CONTENT_TYPE, "application/json+protobuf") + .body(format!(r#"[[null,null,null,[7,5],null,[["youtube_live_chat_web"],[1],[[["{}"]]]]]]"#, self.topic)) + .send() + .await? + .simd_json() + .await?; + let gsess = server_response.as_array().unwrap()[0].as_str().unwrap(); + self.gsessionid = Some(gsess.to_owned()); + Ok(()) + } + + pub async fn init_session(&mut self) -> Result<(), YouTubeError> { + let mut ofs_parameters = HashMap::new(); + ofs_parameters.insert("count", "1".to_string()); + ofs_parameters.insert("ofs", "0".to_string()); + ofs_parameters.insert( + "req0___data__", + format!(r#"[[["1",[null,null,null,[7,5],null,[["youtube_live_chat_web"],[1],[[["{}"]]]],null,null,1],null,3]]]"#, self.topic) + ); + self.session_n = 1; + let ofs = super::get_http_client() + .post(Url::parse_with_params( + GCM_SIGNALER_PSUB, + [ + ("VER", "8"), + ("gsessionid", self.gsessionid.as_ref().unwrap()), + ("key", LIVE_CHAT_BASE_TANGO_KEY), + ("RID", &self.rid.to_string()), + ("AID", &self.aid.to_string()), + ("CVER", "22"), + ("zx", Self::gen_zx().as_ref()), + ("t", "1") + ] + )?) + .header("X-WebChannel-Content-Type", "application/json+protobuf") + .form(&ofs_parameters) + .send() + .await?; + + let mut ofs_res_line = ofs.bytes().await?.lines().nth(1).unwrap().unwrap(); + let value: OwnedValue = unsafe { simd_json::from_str(&mut ofs_res_line) }?; + let value = value.as_array().unwrap()[0].as_array().unwrap(); + assert_eq!(value[0].as_usize().unwrap(), 0); + let sid = value[1].as_array().unwrap()[1].as_str().unwrap(); + self.sid = Some(sid.to_owned()); + Ok(()) + } + + pub async fn get_session_stream(&self) -> Result { + Ok(super::get_http_client() + .get(Url::parse_with_params( + GCM_SIGNALER_PSUB, + [ + ("VER", "8"), + ("gsessionid", self.gsessionid.as_ref().unwrap()), + ("key", LIVE_CHAT_BASE_TANGO_KEY), + ("RID", "rpc"), + ("SID", self.sid.as_ref().unwrap()), + ("AID", &self.aid.to_string()), + ("CI", "0"), + ("TYPE", "xmlhttp"), + ("zx", &Self::gen_zx()), + ("t", "1") + ] + )?) + .header(header::CONNECTION, "keep-alive") + .send() + .await?) + } +} + +#[derive(Debug)] +pub struct SignalerChannel { + inner: Arc> +} + +impl SignalerChannel { + pub async fn new(topic_id: impl ToString) -> Result { + Ok(SignalerChannel { + inner: Arc::new(Mutex::new(SignalerChannelInner::with_topic(topic_id))) + }) + } + + pub async fn new_from_cont(cont: &GetLiveChatResponse) -> Result { + Ok(SignalerChannel { + inner: Arc::new(Mutex::new(SignalerChannelInner::with_topic( + &cont.continuation_contents.as_ref().unwrap().live_chat_continuation.continuations[0] + .invalidation_continuation_data + .as_ref() + .unwrap() + .invalidation_id + .topic + ))) + }) + } + + pub async fn refresh_topic(&self, topic: impl ToString) { + self.inner.lock().await.topic = topic.to_string(); + } + + pub async fn spawn_event_subscriber(&self) -> Result<(broadcast::Receiver<()>, JoinHandle<()>), YouTubeError> { + let inner = Arc::clone(&self.inner); + { + let mut lock = inner.lock().await; + lock.choose_server().await?; + lock.init_session().await?; + } + let (sender, receiver) = broadcast::channel(128); + let handle = tokio::spawn(async move { + 'i: loop { + let mut req = { + let mut lock = inner.lock().await; + let _ = sender.send(()); + lock.reset(); + lock.choose_server().await.unwrap(); + lock.init_session().await.unwrap(); + lock.get_session_stream().await.unwrap() + }; + loop { + match req.chunk().await { + Ok(None) => break, + Ok(Some(s)) => { + let mut ofs_res_line = s.lines().nth(1).unwrap().unwrap(); + if let Ok(s) = unsafe { simd_json::from_str::(ofs_res_line.as_mut()) } { + let a = s.as_array().unwrap(); + { + inner.lock().await.aid = a[a.len() - 1].as_array().unwrap()[0].as_usize().unwrap(); + } + } + + if sender.send(()).is_err() { + break 'i; + } + } + Err(e) => { + eprintln!("{e:?}"); + break; + } + } + } + } + }); + Ok((receiver, handle)) + } +} diff --git a/src/youtube/types.rs b/src/youtube/types.rs index 182b1b8..9458493 100644 --- a/src/youtube/types.rs +++ b/src/youtube/types.rs @@ -124,6 +124,21 @@ pub enum MessageRun { } } +impl ToString for MessageRun { + fn to_string(&self) -> String { + match self { + Self::MessageText { text } => text.to_owned(), + Self::MessageEmoji { emoji, .. } => { + if let Some(true) = emoji.is_custom_emoji { + format!(":{}:", emoji.image.accessibility.accessibility_data.label) + } else { + emoji.image.accessibility.accessibility_data.label.to_owned() + } + } + } + } +} + #[derive(Deserialize, Debug, Clone)] #[serde(rename_all = "camelCase")] pub struct Emoji {