diff --git a/async-nats/src/connection.rs b/async-nats/src/connection.rs index 4d7352266..00c17f627 100644 --- a/async-nats/src/connection.rs +++ b/async-nats/src/connection.rs @@ -19,6 +19,8 @@ use std::future::{self, Future}; use std::io::IoSlice; use std::pin::Pin; use std::str::{self, FromStr}; +use std::sync::atomic::Ordering; +use std::sync::Arc; use std::task::{Context, Poll}; use bytes::{Buf, Bytes, BytesMut}; @@ -27,7 +29,7 @@ use tokio::io::{self, AsyncRead, AsyncReadExt, AsyncWrite}; use crate::header::{HeaderMap, HeaderName, IntoHeaderValue}; use crate::status::StatusCode; use crate::subject::Subject; -use crate::{ClientOp, ServerError, ServerOp}; +use crate::{ClientOp, ServerError, ServerOp, Statistics}; /// Soft limit for the amount of bytes in [`Connection::write_buf`] /// and [`Connection::flattened_writes`]. @@ -80,12 +82,17 @@ pub(crate) struct Connection { write_buf_len: usize, flattened_writes: BytesMut, can_flush: bool, + statistics: Arc, } /// Internal representation of the connection. /// Holds connection with NATS Server and communicates with `Client` via channels. impl Connection { - pub(crate) fn new(stream: Box, read_buffer_capacity: usize) -> Self { + pub(crate) fn new( + stream: Box, + read_buffer_capacity: usize, + statistics: Arc, + ) -> Self { Self { stream, read_buf: BytesMut::with_capacity(read_buffer_capacity), @@ -93,6 +100,7 @@ impl Connection { write_buf_len: 0, flattened_writes: BytesMut::new(), can_flush: false, + statistics, } } @@ -407,7 +415,10 @@ impl Connection { Poll::Pending => Poll::Pending, Poll::Ready(Ok(0)) if self.read_buf.is_empty() => Poll::Ready(Ok(None)), Poll::Ready(Ok(0)) => Poll::Ready(Err(io::ErrorKind::ConnectionReset.into())), - Poll::Ready(Ok(_n)) => continue, + Poll::Ready(Ok(n)) => { + self.statistics.in_bytes.add(n as u64, Ordering::Relaxed); + continue; + } Poll::Ready(Err(err)) => Poll::Ready(Err(err)), }; } @@ -544,6 +555,7 @@ impl Connection { match Pin::new(&mut self.stream).poll_write(cx, buf) { Poll::Pending => return Poll::Pending, Poll::Ready(Ok(n)) => { + self.statistics.out_bytes.add(n as u64, Ordering::Relaxed); self.write_buf_len -= n; self.can_flush = true; @@ -564,7 +576,6 @@ impl Connection { } } } - /// Write the internal buffers into the write stream using vectored write operations /// /// Writes [`WRITE_VECTORED_CHUNKS`] at a time. More efficient _if_ @@ -595,6 +606,7 @@ impl Connection { match Pin::new(&mut self.stream).poll_write_vectored(cx, &writes[..writes_len]) { Poll::Pending => return Poll::Pending, Poll::Ready(Ok(mut n)) => { + self.statistics.out_bytes.add(n as u64, Ordering::Relaxed); self.write_buf_len -= n; self.can_flush = true; @@ -673,14 +685,16 @@ impl Connection { #[cfg(test)] mod read_op { + use std::sync::Arc; + use super::Connection; - use crate::{HeaderMap, ServerError, ServerInfo, ServerOp, StatusCode}; + use crate::{HeaderMap, ServerError, ServerInfo, ServerOp, Statistics, StatusCode}; use tokio::io::{self, AsyncWriteExt}; #[tokio::test] async fn ok() { let (stream, mut server) = io::duplex(128); - let mut connection = Connection::new(Box::new(stream), 0); + let mut connection = Connection::new(Box::new(stream), 0, Arc::new(Statistics::default())); server.write_all(b"+OK\r\n").await.unwrap(); let result = connection.read_op().await.unwrap(); @@ -690,7 +704,7 @@ mod read_op { #[tokio::test] async fn ping() { let (stream, mut server) = io::duplex(128); - let mut connection = Connection::new(Box::new(stream), 0); + let mut connection = Connection::new(Box::new(stream), 0, Arc::new(Statistics::default())); server.write_all(b"PING\r\n").await.unwrap(); let result = connection.read_op().await.unwrap(); @@ -700,7 +714,7 @@ mod read_op { #[tokio::test] async fn pong() { let (stream, mut server) = io::duplex(128); - let mut connection = Connection::new(Box::new(stream), 0); + let mut connection = Connection::new(Box::new(stream), 0, Arc::new(Statistics::default())); server.write_all(b"PONG\r\n").await.unwrap(); let result = connection.read_op().await.unwrap(); @@ -710,7 +724,7 @@ mod read_op { #[tokio::test] async fn info() { let (stream, mut server) = io::duplex(128); - let mut connection = Connection::new(Box::new(stream), 0); + let mut connection = Connection::new(Box::new(stream), 0, Arc::new(Statistics::default())); server.write_all(b"INFO {}\r\n").await.unwrap(); server.flush().await.unwrap(); @@ -737,7 +751,7 @@ mod read_op { #[tokio::test] async fn error() { let (stream, mut server) = io::duplex(128); - let mut connection = Connection::new(Box::new(stream), 0); + let mut connection = Connection::new(Box::new(stream), 0, Arc::new(Statistics::default())); server.write_all(b"INFO {}\r\n").await.unwrap(); let result = connection.read_op().await.unwrap(); @@ -759,7 +773,7 @@ mod read_op { #[tokio::test] async fn message() { let (stream, mut server) = io::duplex(128); - let mut connection = Connection::new(Box::new(stream), 0); + let mut connection = Connection::new(Box::new(stream), 0, Arc::new(Statistics::default())); server .write_all(b"MSG FOO.BAR 9 11\r\nHello World\r\n") @@ -906,7 +920,7 @@ mod read_op { #[tokio::test] async fn unknown() { let (stream, mut server) = io::duplex(128); - let mut connection = Connection::new(Box::new(stream), 0); + let mut connection = Connection::new(Box::new(stream), 0, Arc::new(Statistics::default())); server.write_all(b"ONE\r\n").await.unwrap(); connection.read_op().await.unwrap_err(); @@ -956,14 +970,16 @@ mod read_op { #[cfg(test)] mod write_op { + use std::sync::Arc; + use super::Connection; - use crate::{ClientOp, ConnectInfo, HeaderMap, Protocol}; + use crate::{ClientOp, ConnectInfo, HeaderMap, Protocol, Statistics}; use tokio::io::{self, AsyncBufReadExt, BufReader}; #[tokio::test] async fn publish() { let (stream, server) = io::duplex(128); - let mut connection = Connection::new(Box::new(stream), 0); + let mut connection = Connection::new(Box::new(stream), 0, Arc::new(Statistics::default())); connection .easy_write_and_flush( @@ -1032,7 +1048,7 @@ mod write_op { #[tokio::test] async fn subscribe() { let (stream, server) = io::duplex(128); - let mut connection = Connection::new(Box::new(stream), 0); + let mut connection = Connection::new(Box::new(stream), 0, Arc::new(Statistics::default())); connection .easy_write_and_flush( @@ -1071,7 +1087,7 @@ mod write_op { #[tokio::test] async fn unsubscribe() { let (stream, server) = io::duplex(128); - let mut connection = Connection::new(Box::new(stream), 0); + let mut connection = Connection::new(Box::new(stream), 0, Arc::new(Statistics::default())); connection .easy_write_and_flush([ClientOp::Unsubscribe { sid: 11, max: None }].iter()) @@ -1102,7 +1118,7 @@ mod write_op { #[tokio::test] async fn ping() { let (stream, server) = io::duplex(128); - let mut connection = Connection::new(Box::new(stream), 0); + let mut connection = Connection::new(Box::new(stream), 0, Arc::new(Statistics::default())); let mut reader = BufReader::new(server); let mut buffer = String::new(); @@ -1120,7 +1136,7 @@ mod write_op { #[tokio::test] async fn pong() { let (stream, server) = io::duplex(128); - let mut connection = Connection::new(Box::new(stream), 0); + let mut connection = Connection::new(Box::new(stream), 0, Arc::new(Statistics::default())); let mut reader = BufReader::new(server); let mut buffer = String::new(); @@ -1138,7 +1154,7 @@ mod write_op { #[tokio::test] async fn connect() { let (stream, server) = io::duplex(1024); - let mut connection = Connection::new(Box::new(stream), 0); + let mut connection = Connection::new(Box::new(stream), 0, Arc::new(Statistics::default())); let mut reader = BufReader::new(server); let mut buffer = String::new(); diff --git a/async-nats/src/connector.rs b/async-nats/src/connector.rs index 7bf54307a..da71cde96 100644 --- a/async-nats/src/connector.rs +++ b/async-nats/src/connector.rs @@ -335,6 +335,7 @@ impl Connector { let mut connection = Connection::new( Box::new(tcp_stream), self.options.read_buffer_capacity.into(), + self.connect_stats.clone(), ); let tls_connection = |connection: Connection| async { @@ -352,7 +353,11 @@ impl Connector { .connect(domain.to_owned(), connection.stream) .await?; - Ok::(Connection::new(Box::new(tls_stream), 0)) + Ok::(Connection::new( + Box::new(tls_stream), + 0, + self.connect_stats.clone(), + )) }; // If `tls_first` was set, establish TLS connection before getting INFO. diff --git a/async-nats/src/lib.rs b/async-nats/src/lib.rs index b6079c005..730a528a7 100755 --- a/async-nats/src/lib.rs +++ b/async-nats/src/lib.rs @@ -674,10 +674,6 @@ impl ConnectionHandler { .connect_stats .in_messages .add(1, Ordering::Relaxed); - self.connector - .connect_stats - .in_bytes - .add(length as u64, Ordering::Relaxed); if let Some(subscription) = self.subscriptions.get_mut(&sid) { let message: Message = Message { @@ -808,11 +804,6 @@ impl ConnectionHandler { } => { let (prefix, token) = respond.rsplit_once('.').expect("malformed request subject"); - let header_len = headers - .as_ref() - .map(|headers| headers.len()) - .unwrap_or_default(); - let multiplexer = if let Some(multiplexer) = self.multiplexer.as_mut() { multiplexer } else { @@ -840,10 +831,6 @@ impl ConnectionHandler { let respond: Subject = format!("{}{}", multiplexer.prefix, token).into(); - self.connector.connect_stats.out_bytes.add( - (payload.len() + respond.len() + subject.len() + header_len) as u64, - Ordering::Relaxed, - ); let pub_op = ClientOp::Publish { subject, payload, diff --git a/async-nats/tests/client_tests.rs b/async-nats/tests/client_tests.rs index b7275a24f..bdb0cea2c 100644 --- a/async-nats/tests/client_tests.rs +++ b/async-nats/tests/client_tests.rs @@ -959,8 +959,8 @@ mod client { assert_eq!(stats.in_messages.load(Ordering::Relaxed), 0); assert_eq!(stats.out_messages.load(Ordering::Relaxed), 0); - assert_eq!(stats.in_bytes.load(Ordering::Relaxed), 0); - assert_eq!(stats.out_bytes.load(Ordering::Relaxed), 0); + assert!(stats.in_bytes.load(Ordering::Relaxed) != 0); + assert!(stats.out_bytes.load(Ordering::Relaxed) != 0); assert_eq!(stats.connects.load(Ordering::Relaxed), 1); let mut responder = client.subscribe("request").await.unwrap(); @@ -992,8 +992,8 @@ mod client { assert_eq!(stats.in_messages.load(Ordering::Relaxed), 4); assert_eq!(stats.out_messages.load(Ordering::Relaxed), 4); - assert_eq!(stats.in_bytes.load(Ordering::Relaxed), 139); - assert_eq!(stats.out_bytes.load(Ordering::Relaxed), 139); + assert!(stats.in_bytes.load(Ordering::Relaxed) != 0); + assert!(stats.out_bytes.load(Ordering::Relaxed) != 0); assert_eq!(stats.connects.load(Ordering::Relaxed), 2); } }