diff --git a/CHANGELOG.md b/CHANGELOG.md index 61635d711..6e7aabfee 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -89,6 +89,7 @@ * pool: refactor relay removal logic and add unit tests ([Yuki Kishimoto]) * pool: handle `close` WebSocket message ([Yuki Kishimoto]) * pool: always close WebSocket connection when handlers terminate ([Yuki Kishimoto]) +* pool: better control over the handling of the termination request ([Yuki Kishimoto]) * lmdb: use `EventBorrow` instead of `DatabaseEvent` ([Yuki Kishimoto]) * ndb: refactor note-to-event conversion ([Yuki Kishimoto]) * relay-builder: refactor shutdown mechanism to use `Notify` over `broadcast` ([Yuki Kishimoto]) diff --git a/crates/nostr-relay-pool/src/relay/error.rs b/crates/nostr-relay-pool/src/relay/error.rs index a9ba57b2b..43fdf0c79 100644 --- a/crates/nostr-relay-pool/src/relay/error.rs +++ b/crates/nostr-relay-pool/src/relay/error.rs @@ -60,6 +60,8 @@ pub enum Error { NotReady, /// Relay not connected NotConnected, + /// Received termination request + TerminationRequest, /// Received shutdown ReceivedShutdown, /// Relay message @@ -146,6 +148,7 @@ impl fmt::Display for Error { } Self::NotReady => write!(f, "relay is initialized but not ready"), Self::NotConnected => write!(f, "relay not connected"), + Self::TerminationRequest => write!(f, "received termination request"), Self::ReceivedShutdown => write!(f, "received shutdown"), Self::RelayMessage(message) => write!(f, "{message}"), Self::BatchMessagesEmpty => write!(f, "can't batch empty list of messages"), diff --git a/crates/nostr-relay-pool/src/relay/inner.rs b/crates/nostr-relay-pool/src/relay/inner.rs index b26947122..aa1392faa 100644 --- a/crates/nostr-relay-pool/src/relay/inner.rs +++ b/crates/nostr-relay-pool/src/relay/inner.rs @@ -12,9 +12,7 @@ use std::time::Duration; use async_utility::{task, time}; use async_wsocket::futures_util::{self, SinkExt, StreamExt}; -use async_wsocket::{ - connect as wsocket_connect, ConnectionMode, Error as WsError, Sink, Stream, WsMessage, -}; +use async_wsocket::{connect as wsocket_connect, ConnectionMode, Sink, Stream, WsMessage}; use atomic_destructor::AtomicDestroyer; use negentropy::{Bytes, Id, Negentropy, NegentropyStorageVector}; use negentropy_deprecated::{Bytes as BytesDeprecated, Negentropy as NegentropyDeprecated}; @@ -413,12 +411,11 @@ impl InnerRelay { // TODO: if the relay score is too low, immediately exit. // TODO: at every loop iteration check the score and if it's too low, exit - tokio::select! { - // Connect and run message handler - _ = relay.connect_and_run(stream, &mut rx_nostr, &mut last_ws_error) => {}, - // Handle "termination notification - _ = relay.handle_terminate() => break, - } + // Connect and run message handler + // The termination requests are handled inside this method! + relay + .connect_and_run(stream, &mut rx_nostr, &mut last_ws_error) + .await; // Update stream to `None`, meaning that it was already used (if was some). stream = None; @@ -447,10 +444,12 @@ impl InnerRelay { interval.as_secs() ); + // Sleep before retry to connect + // Handle termination to allow to exit immediately if request is received during the sleep. tokio::select! { // Sleep _ = time::sleep(interval) => {}, - // Handle "termination notification + // Handle termination notification _ = relay.handle_terminate() => break, } } else { @@ -521,16 +520,20 @@ impl InnerRelay { &self, timeout: Duration, status_on_failure: RelayStatus, - ) -> Result<(Sink, Stream), WsError> { + ) -> Result<(Sink, Stream), Error> { // Update status self.set_status(RelayStatus::Connecting, true); // Add attempt self.stats.new_attempt(); - // Connect - match wsocket_connect((&self.url).into(), &self.opts.connection_mode, timeout).await { - Ok((ws_tx, ws_rx)) => { + // Try to connect + // If during connection the termination request is received, abort the connection and return error. + // At this stem is NOT required to close the WebSocket connection. + tokio::select! { + // Connect + res = wsocket_connect((&self.url).into(), &self.opts.connection_mode, timeout) => match res { + Ok((ws_tx, ws_rx)) => { // Update status self.set_status(RelayStatus::Connected, true); @@ -544,8 +547,11 @@ impl InnerRelay { self.set_status(status_on_failure, false); // Return error - Err(e) + Err(Error::WebSocket(e)) } + }, + // Handle termination notification + _ = self.handle_terminate() => Err(Error::TerminationRequest), } } @@ -614,15 +620,21 @@ impl InnerRelay { let ping: PingTracker = PingTracker::default(); // Wait that one of the futures terminates/completes + // Also also termination here, to allow to close the connection in case of termination request. tokio::select! { + // Message receiver handler res = self.receiver_message_handler(ws_rx, &ping) => match res { Ok(()) => tracing::trace!(url = %self.url, "Relay received exited."), Err(e) => tracing::error!(url = %self.url, error = %e, "Relay receiver exited with error.") }, + // Message sender handler res = self.sender_message_handler(&mut ws_tx, rx_nostr, &ping) => match res { Ok(()) => tracing::trace!(url = %self.url, "Relay sender exited."), Err(e) => tracing::error!(url = %self.url, error = %e, "Relay sender exited with error.") }, + // Termination handler + _ = self.handle_terminate() => {}, + // Pinger _ = self.pinger() => {} } diff --git a/crates/nostr-relay-pool/src/relay/mod.rs b/crates/nostr-relay-pool/src/relay/mod.rs index e4be6d4fa..1f9d3c38b 100644 --- a/crates/nostr-relay-pool/src/relay/mod.rs +++ b/crates/nostr-relay-pool/src/relay/mod.rs @@ -984,6 +984,34 @@ mod tests { assert!(!relay.inner.is_running()); } + #[tokio::test] + async fn test_disconnect_unresponsive_during_try_connect() { + // Mock relay + let opts = RelayTestOptions { + unresponsive_connection: Some(Duration::from_secs(10)), + }; + let mock = MockRelay::run_with_opts(opts).await.unwrap(); + let url = RelayUrl::parse(&mock.url()).unwrap(); + + let relay = Relay::new(url); + + assert_eq!(relay.status(), RelayStatus::Initialized); + + // Terminate after 3 secs + let r = relay.clone(); + tokio::spawn(async move { + time::sleep(Duration::from_secs(3)).await; + r.disconnect(); + }); + + let res = relay.try_connect(Duration::from_secs(7)).await; + assert!(matches!(res.unwrap_err(), Error::TerminationRequest)); + + assert_eq!(relay.status(), RelayStatus::Terminated); + + assert!(!relay.inner.is_running()); + } + #[tokio::test] async fn test_wait_for_connection() { // Mock relay