Skip to content

Commit

Permalink
pool: better control over the handling of the termination request
Browse files Browse the repository at this point in the history
Signed-off-by: Yuki Kishimoto <[email protected]>
  • Loading branch information
yukibtc committed Jan 24, 2025
1 parent 9c87cb3 commit 3da8406
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 15 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
* pool: refactor relay removal logic and add unit tests ([Yuki Kishimoto])
* pool: handle `close` WebSocket message ([Yuki Kishimoto])
* pool: always close WebSocket connection when handlers terminate ([Yuki Kishimoto])
* pool: better control over the handling of the termination request ([Yuki Kishimoto])
* lmdb: use `EventBorrow` instead of `DatabaseEvent` ([Yuki Kishimoto])
* ndb: refactor note-to-event conversion ([Yuki Kishimoto])
* relay-builder: refactor shutdown mechanism to use `Notify` over `broadcast` ([Yuki Kishimoto])
Expand Down
3 changes: 3 additions & 0 deletions crates/nostr-relay-pool/src/relay/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ pub enum Error {
NotReady,
/// Relay not connected
NotConnected,
/// Received termination request
TerminationRequest,
/// Received shutdown
ReceivedShutdown,
/// Relay message
Expand Down Expand Up @@ -146,6 +148,7 @@ impl fmt::Display for Error {
}
Self::NotReady => write!(f, "relay is initialized but not ready"),
Self::NotConnected => write!(f, "relay not connected"),
Self::TerminationRequest => write!(f, "received termination request"),
Self::ReceivedShutdown => write!(f, "received shutdown"),
Self::RelayMessage(message) => write!(f, "{message}"),
Self::BatchMessagesEmpty => write!(f, "can't batch empty list of messages"),
Expand Down
42 changes: 27 additions & 15 deletions crates/nostr-relay-pool/src/relay/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@ use std::time::Duration;

use async_utility::{task, time};
use async_wsocket::futures_util::{self, SinkExt, StreamExt};
use async_wsocket::{
connect as wsocket_connect, ConnectionMode, Error as WsError, Sink, Stream, WsMessage,
};
use async_wsocket::{connect as wsocket_connect, ConnectionMode, Sink, Stream, WsMessage};
use atomic_destructor::AtomicDestroyer;
use negentropy::{Bytes, Id, Negentropy, NegentropyStorageVector};
use negentropy_deprecated::{Bytes as BytesDeprecated, Negentropy as NegentropyDeprecated};
Expand Down Expand Up @@ -413,12 +411,11 @@ impl InnerRelay {
// TODO: if the relay score is too low, immediately exit.
// TODO: at every loop iteration check the score and if it's too low, exit

tokio::select! {
// Connect and run message handler
_ = relay.connect_and_run(stream, &mut rx_nostr, &mut last_ws_error) => {},
// Handle "termination notification
_ = relay.handle_terminate() => break,
}
// Connect and run message handler
// The termination requests are handled inside this method!
relay
.connect_and_run(stream, &mut rx_nostr, &mut last_ws_error)
.await;

// Update stream to `None`, meaning that it was already used (if was some).
stream = None;
Expand Down Expand Up @@ -447,10 +444,12 @@ impl InnerRelay {
interval.as_secs()
);

// Sleep before retry to connect
// Handle termination to allow to exit immediately if request is received during the sleep.
tokio::select! {
// Sleep
_ = time::sleep(interval) => {},
// Handle "termination notification
// Handle termination notification
_ = relay.handle_terminate() => break,
}
} else {
Expand Down Expand Up @@ -521,16 +520,20 @@ impl InnerRelay {
&self,
timeout: Duration,
status_on_failure: RelayStatus,
) -> Result<(Sink, Stream), WsError> {
) -> Result<(Sink, Stream), Error> {
// Update status
self.set_status(RelayStatus::Connecting, true);

// Add attempt
self.stats.new_attempt();

// Connect
match wsocket_connect((&self.url).into(), &self.opts.connection_mode, timeout).await {
Ok((ws_tx, ws_rx)) => {
// Try to connect
// If during connection the termination request is received, abort the connection and return error.
// At this stem is NOT required to close the WebSocket connection.
tokio::select! {
// Connect
res = wsocket_connect((&self.url).into(), &self.opts.connection_mode, timeout) => match res {
Ok((ws_tx, ws_rx)) => {
// Update status
self.set_status(RelayStatus::Connected, true);

Expand All @@ -544,8 +547,11 @@ impl InnerRelay {
self.set_status(status_on_failure, false);

// Return error
Err(e)
Err(Error::WebSocket(e))
}
},
// Handle termination notification
_ = self.handle_terminate() => Err(Error::TerminationRequest),
}
}

Expand Down Expand Up @@ -614,15 +620,21 @@ impl InnerRelay {
let ping: PingTracker = PingTracker::default();

// Wait that one of the futures terminates/completes
// Also also termination here, to allow to close the connection in case of termination request.
tokio::select! {
// Message receiver handler
res = self.receiver_message_handler(ws_rx, &ping) => match res {
Ok(()) => tracing::trace!(url = %self.url, "Relay received exited."),
Err(e) => tracing::error!(url = %self.url, error = %e, "Relay receiver exited with error.")
},
// Message sender handler
res = self.sender_message_handler(&mut ws_tx, rx_nostr, &ping) => match res {
Ok(()) => tracing::trace!(url = %self.url, "Relay sender exited."),
Err(e) => tracing::error!(url = %self.url, error = %e, "Relay sender exited with error.")
},
// Termination handler
_ = self.handle_terminate() => {},
// Pinger
_ = self.pinger() => {}
}

Expand Down
28 changes: 28 additions & 0 deletions crates/nostr-relay-pool/src/relay/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -984,6 +984,34 @@ mod tests {
assert!(!relay.inner.is_running());
}

#[tokio::test]
async fn test_disconnect_unresponsive_during_try_connect() {
// Mock relay
let opts = RelayTestOptions {
unresponsive_connection: Some(Duration::from_secs(10)),
};
let mock = MockRelay::run_with_opts(opts).await.unwrap();
let url = RelayUrl::parse(&mock.url()).unwrap();

let relay = Relay::new(url);

assert_eq!(relay.status(), RelayStatus::Initialized);

// Terminate after 3 secs
let r = relay.clone();
tokio::spawn(async move {
time::sleep(Duration::from_secs(3)).await;
r.disconnect();
});

let res = relay.try_connect(Duration::from_secs(7)).await;
assert!(matches!(res.unwrap_err(), Error::TerminationRequest));

assert_eq!(relay.status(), RelayStatus::Terminated);

assert!(!relay.inner.is_running());
}

#[tokio::test]
async fn test_wait_for_connection() {
// Mock relay
Expand Down

0 comments on commit 3da8406

Please sign in to comment.