Skip to content

Commit

Permalink
implement Stream for Responses
Browse files Browse the repository at this point in the history
Signed-off-by: Petros Angelatos <[email protected]>
  • Loading branch information
petrosagg committed May 27, 2021
1 parent aefa11c commit 8423d6d
Showing 1 changed file with 13 additions and 1 deletion.
14 changes: 13 additions & 1 deletion tokio-postgres/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@ use crate::{
use bytes::{Buf, BytesMut};
use fallible_iterator::FallibleIterator;
use futures::channel::mpsc;
use futures::{future, pin_mut, ready, StreamExt, TryStreamExt};
use futures::{future, pin_mut, ready, Stream, StreamExt, TryStreamExt};
use parking_lot::Mutex;
use postgres_protocol::message::backend::Message;
use postgres_types::BorrowToSql;
use std::collections::HashMap;
use std::fmt;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;
Expand Down Expand Up @@ -54,6 +55,17 @@ impl Responses {
}
}

impl Stream for Responses {
type Item = Result<Message, Error>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match ready!((*self).poll_next(cx)) {
Err(err) if err.is_closed() => Poll::Ready(None),
msg => Poll::Ready(Some(msg)),
}
}
}

/// A cache of type info and prepared statements for fetching type info
/// (corresponding to the queries in the [prepare](prepare) module).
#[derive(Default)]
Expand Down

0 comments on commit 8423d6d

Please sign in to comment.