Skip to content

Commit

Permalink
do not panic on stream end
Browse files Browse the repository at this point in the history
  • Loading branch information
decahedron1 committed Apr 4, 2024
1 parent 45627ef commit e55588c
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 14 deletions.
25 changes: 12 additions & 13 deletions src/youtube/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,29 +66,29 @@ unsafe impl<'r> Send for ActionChunk<'r> {}

impl<'r> ActionChunk<'r> {
pub fn new(response: GetLiveChatResponse, ctx: &'r ChatContext) -> Result<Self, Error> {
let continuation_token = match &response.continuation_contents.live_chat_continuation.continuations[0] {
let continuation_contents = response.continuation_contents.ok_or(Error::EndOfContinuation)?;

let continuation_token = match &continuation_contents.live_chat_continuation.continuations[0] {
Continuation::Invalidation { continuation, .. } => continuation.to_owned(),
Continuation::Timed { continuation, .. } => continuation.to_owned(),
Continuation::Replay { continuation, .. } => continuation.to_owned(),
Continuation::PlayerSeek { .. } => return Err(Error::EndOfContinuation)
};
let signaler_topic = match &response.continuation_contents.live_chat_continuation.continuations[0] {
let signaler_topic = match &continuation_contents.live_chat_continuation.continuations[0] {
Continuation::Invalidation { invalidation_id, .. } => Some(invalidation_id.topic.to_owned()),
_ => None
};
Ok(Self {
actions: if ctx.live_status.updates_live() {
response
.continuation_contents
continuation_contents
.live_chat_continuation
.actions
.unwrap_or_default()
.into_iter()
.map(|f| f.action)
.collect()
} else {
response
.continuation_contents
continuation_contents
.live_chat_continuation
.actions
.ok_or(Error::EndOfContinuation)?
Expand All @@ -109,14 +109,13 @@ impl<'r> ActionChunk<'r> {
self.actions.iter()
}

async fn next_page(&self, continuation_token: &String) -> Result<Self, Error> {
let page = GetLiveChatResponse::fetch(self.ctx, continuation_token).await?;
ActionChunk::new(page, self.ctx)
}

pub async fn cont(&self) -> Option<Result<Self, Error>> {
if let Some(continuation_token) = &self.continuation_token {
Some(self.next_page(continuation_token).await)
let page = match GetLiveChatResponse::fetch(self.ctx, continuation_token).await {
Err(e) => return Some(Err(e)),
Ok(page) => page
};
if page.continuation_contents.is_some() { Some(ActionChunk::new(page, self.ctx)) } else { None }
} else {
None
}
Expand All @@ -140,7 +139,7 @@ pub async fn stream(options: &ChatContext) -> Result<Pin<Box<impl Stream<Item =
Ok(Box::pin(async_stream::__private::AsyncStream::new(yield_rx, async move {
let mut seen_messages = HashSet::new();

match &initial_chat.continuation_contents.live_chat_continuation.continuations[0] {
match &initial_chat.continuation_contents.as_ref().unwrap().live_chat_continuation.continuations[0] {
Continuation::Invalidation { invalidation_id, .. } => {
let topic = invalidation_id.topic.to_owned();

Expand Down
2 changes: 1 addition & 1 deletion src/youtube/types/get_live_chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ pub struct GetLiveChatRequestBodyContextClient {
#[serde(rename_all = "camelCase")]
pub struct GetLiveChatResponse {
pub response_context: Option<simd_json::OwnedValue>,
pub continuation_contents: GetLiveChatResponseContinuationContents
pub continuation_contents: Option<GetLiveChatResponseContinuationContents>
}

impl GetLiveChatResponse {
Expand Down

0 comments on commit e55588c

Please sign in to comment.