Skip to content

Commit

Permalink
Use muxed inbox to handle jetstream publishes
Browse files Browse the repository at this point in the history
  • Loading branch information
paolobarbolini authored and Jarema committed Oct 7, 2023
1 parent 0667d72 commit 4222b83
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 27 deletions.
2 changes: 1 addition & 1 deletion async-nats/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ impl From<tokio::sync::mpsc::error::SendError<Command>> for PublishError {
pub struct Client {
info: tokio::sync::watch::Receiver<ServerInfo>,
pub(crate) state: tokio::sync::watch::Receiver<State>,
sender: mpsc::Sender<Command>,
pub(crate) sender: mpsc::Sender<Command>,
next_subscription_id: Arc<AtomicU64>,
subscription_capacity: usize,
inbox_prefix: String,
Expand Down
52 changes: 26 additions & 26 deletions async-nats/src/jetstream/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ use crate::jetstream::account::Account;
use crate::jetstream::publish::PublishAck;
use crate::jetstream::response::Response;
use crate::subject::Subject;
use crate::{header, Client, HeaderMap, HeaderValue, StatusCode};
use crate::{header, Client, Command, HeaderMap, HeaderValue, Message, StatusCode};
use bytes::Bytes;
use futures::future::BoxFuture;
use futures::{Future, StreamExt, TryFutureExt};
use futures::{Future, TryFutureExt};
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use serde_json::{self, json};
Expand All @@ -34,6 +34,7 @@ use std::pin::Pin;
use std::str::from_utf8;
use std::task::Poll;
use std::time::Duration;
use tokio::sync::oneshot;
use tracing::debug;

use super::consumer::{Consumer, FromConsumer, IntoConsumerConfig};
Expand Down Expand Up @@ -189,30 +190,29 @@ impl Context {
subject: Subject,
publish: Publish,
) -> Result<PublishAckFuture, PublishError> {
let inbox = Subject::from(self.client.new_inbox());
let response = self
let (sender, receiver) = oneshot::channel();

let respond = self.client.new_inbox().into();

let send_fut = self
.client
.subscribe(inbox.clone())
.await
.map_err(|err| PublishError::with_source(PublishErrorKind::Other, err))?;
tokio::time::timeout(self.timeout, async {
if let Some(headers) = publish.headers {
self.client
.publish_with_reply_and_headers(subject, inbox, headers, publish.payload)
.await
} else {
self.client
.publish_with_reply(subject, inbox.clone(), publish.payload)
.await
}
})
.map_err(|_| PublishError::new(PublishErrorKind::TimedOut))
.await?
.map_err(|err| PublishError::with_source(PublishErrorKind::Other, err))?;
.sender
.send(Command::Request {
subject,
payload: publish.payload,
respond,
headers: publish.headers,
sender,
})
.map_err(|err| PublishError::with_source(PublishErrorKind::Other, err));

tokio::time::timeout(self.timeout, send_fut)
.map_err(|_elapsed| PublishError::new(PublishErrorKind::TimedOut))
.await??;

Ok(PublishAckFuture {
timeout: self.timeout,
subscription: response,
subscription: receiver,
})
}

Expand Down Expand Up @@ -973,16 +973,16 @@ pub type PublishError = Error<PublishErrorKind>;
#[derive(Debug)]
pub struct PublishAckFuture {
timeout: Duration,
subscription: crate::Subscriber,
subscription: oneshot::Receiver<Message>,
}

impl PublishAckFuture {
async fn next_with_timeout(mut self) -> Result<PublishAck, PublishError> {
let next = tokio::time::timeout(self.timeout, self.subscription.next())
async fn next_with_timeout(self) -> Result<PublishAck, PublishError> {
let next = tokio::time::timeout(self.timeout, self.subscription)
.await
.map_err(|_| PublishError::new(PublishErrorKind::TimedOut))?;
next.map_or_else(
|| Err(PublishError::new(PublishErrorKind::BrokenPipe)),
|_| Err(PublishError::new(PublishErrorKind::BrokenPipe)),
|m| {
if m.status == Some(StatusCode::NO_RESPONDERS) {
return Err(PublishError::new(PublishErrorKind::StreamNotFound));
Expand Down

0 comments on commit 4222b83

Please sign in to comment.