Skip to content

Commit

Permalink
Merge pull request #1144 from rigby-dane/add_row_description
Browse files Browse the repository at this point in the history
Adds RowDescription to the SimpleQueryMessage
  • Loading branch information
sfackler authored Jul 9, 2024
2 parents ded5e7d + 2647024 commit 647a925
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 31 deletions.
5 changes: 4 additions & 1 deletion tokio-postgres/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ pub use crate::generic_client::GenericClient;
pub use crate::portal::Portal;
pub use crate::query::RowStream;
pub use crate::row::{Row, SimpleQueryRow};
pub use crate::simple_query::SimpleQueryStream;
pub use crate::simple_query::{SimpleColumn, SimpleQueryStream};
#[cfg(feature = "runtime")]
pub use crate::socket::Socket;
pub use crate::statement::{Column, Statement};
Expand All @@ -141,6 +141,7 @@ pub use crate::to_statement::ToStatement;
pub use crate::transaction::Transaction;
pub use crate::transaction_builder::{IsolationLevel, TransactionBuilder};
use crate::types::ToSql;
use std::sync::Arc;

pub mod binary_copy;
mod bind;
Expand Down Expand Up @@ -248,6 +249,8 @@ pub enum SimpleQueryMessage {
///
/// The number of rows modified or selected is returned.
CommandComplete(u64),
/// Column values of the proceeding row values
RowDescription(Arc<[SimpleColumn]>),
}

fn slice_iter<'a>(
Expand Down
53 changes: 26 additions & 27 deletions tokio-postgres/src/simple_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,35 +85,34 @@ impl Stream for SimpleQueryStream {

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
loop {
match ready!(this.responses.poll_next(cx)?) {
Message::CommandComplete(body) => {
let rows = extract_row_affected(&body)?;
return Poll::Ready(Some(Ok(SimpleQueryMessage::CommandComplete(rows))));
}
Message::EmptyQueryResponse => {
return Poll::Ready(Some(Ok(SimpleQueryMessage::CommandComplete(0))));
}
Message::RowDescription(body) => {
let columns = body
.fields()
.map(|f| Ok(SimpleColumn::new(f.name().to_string())))
.collect::<Vec<_>>()
.map_err(Error::parse)?
.into();
match ready!(this.responses.poll_next(cx)?) {
Message::CommandComplete(body) => {
let rows = extract_row_affected(&body)?;
Poll::Ready(Some(Ok(SimpleQueryMessage::CommandComplete(rows))))
}
Message::EmptyQueryResponse => {
Poll::Ready(Some(Ok(SimpleQueryMessage::CommandComplete(0))))
}
Message::RowDescription(body) => {
let columns: Arc<[SimpleColumn]> = body
.fields()
.map(|f| Ok(SimpleColumn::new(f.name().to_string())))
.collect::<Vec<_>>()
.map_err(Error::parse)?
.into();

*this.columns = Some(columns);
}
Message::DataRow(body) => {
let row = match &this.columns {
Some(columns) => SimpleQueryRow::new(columns.clone(), body)?,
None => return Poll::Ready(Some(Err(Error::unexpected_message()))),
};
return Poll::Ready(Some(Ok(SimpleQueryMessage::Row(row))));
}
Message::ReadyForQuery(_) => return Poll::Ready(None),
_ => return Poll::Ready(Some(Err(Error::unexpected_message()))),
*this.columns = Some(columns.clone());
Poll::Ready(Some(Ok(SimpleQueryMessage::RowDescription(columns))))
}
Message::DataRow(body) => {
let row = match &this.columns {
Some(columns) => SimpleQueryRow::new(columns.clone(), body)?,
None => return Poll::Ready(Some(Err(Error::unexpected_message()))),
};
Poll::Ready(Some(Ok(SimpleQueryMessage::Row(row))))
}
Message::ReadyForQuery(_) => Poll::Ready(None),
_ => Poll::Ready(Some(Err(Error::unexpected_message()))),
}
}
}
13 changes: 10 additions & 3 deletions tokio-postgres/tests/test/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,13 @@ async fn simple_query() {
_ => panic!("unexpected message"),
}
match &messages[2] {
SimpleQueryMessage::RowDescription(columns) => {
assert_eq!(columns.get(0).map(|c| c.name()), Some("id"));
assert_eq!(columns.get(1).map(|c| c.name()), Some("name"));
}
_ => panic!("unexpected message"),
}
match &messages[3] {
SimpleQueryMessage::Row(row) => {
assert_eq!(row.columns().get(0).map(|c| c.name()), Some("id"));
assert_eq!(row.columns().get(1).map(|c| c.name()), Some("name"));
Expand All @@ -336,7 +343,7 @@ async fn simple_query() {
}
_ => panic!("unexpected message"),
}
match &messages[3] {
match &messages[4] {
SimpleQueryMessage::Row(row) => {
assert_eq!(row.columns().get(0).map(|c| c.name()), Some("id"));
assert_eq!(row.columns().get(1).map(|c| c.name()), Some("name"));
Expand All @@ -345,11 +352,11 @@ async fn simple_query() {
}
_ => panic!("unexpected message"),
}
match messages[4] {
match messages[5] {
SimpleQueryMessage::CommandComplete(2) => {}
_ => panic!("unexpected message"),
}
assert_eq!(messages.len(), 5);
assert_eq!(messages.len(), 6);
}

#[tokio::test]
Expand Down

0 comments on commit 647a925

Please sign in to comment.