From 583143f4c73a40b3a7f52e9d2fc030974371ef80 Mon Sep 17 00:00:00 2001 From: massimoalbarello Date: Fri, 3 Nov 2023 17:04:42 +0100 Subject: [PATCH 1/5] accept tls connection in a separate task from the ws listener --- src/ic-websocket-gateway/src/ws_listener.rs | 104 ++++++++++++-------- 1 file changed, 64 insertions(+), 40 deletions(-) diff --git a/src/ic-websocket-gateway/src/ws_listener.rs b/src/ic-websocket-gateway/src/ws_listener.rs index 552a20c..0b937d7 100644 --- a/src/ic-websocket-gateway/src/ws_listener.rs +++ b/src/ic-websocket-gateway/src/ws_listener.rs @@ -11,7 +11,7 @@ use std::{fs, sync::Arc, time::Duration}; use tokio::{ net::{TcpListener, TcpStream}, select, - sync::mpsc::{Receiver, Sender}, + sync::mpsc::{self, Receiver, Sender}, time::timeout, }; use tokio_native_tls::{TlsAcceptor, TlsStream}; @@ -91,8 +91,17 @@ impl WsListener { let wait_for_cancellation = parent_token.cancelled(); tokio::pin!(wait_for_cancellation); + // [ws listener task] [tls acceptor task] + // tls_acceptor_rx <----- tls_acceptor_tx + + // channel used by the tls acceptor task to let the ws listener task know when the handshake is complete + let (tls_acceptor_tx, mut tls_acceptor_rx): ( + Sender>, + Receiver>, + ) = mpsc::channel(1000); + let mut limiting_rate: f64 = 0.0; - let tls_timeout_duration = Duration::from_secs(5); + let tls_timeout_duration = Duration::from_secs(10); loop { select! { // bias select! to check token cancellation first @@ -120,48 +129,63 @@ impl WsListener { let current_client_id = self.next_client_id; self.next_client_id += 1; - let mut listener_events = ListenerEvents::new(Some(EventsReference::ClientId(current_client_id)), EventsCollectionType::NewClientConnection, ListenerEventsMetrics::default()); - listener_events.metrics.set_received_request(); - let span = span!( - Level::INFO, - "handle_client_connection", - client_addr = ?client_addr, - client_id = current_client_id - ); - let _guard = span.enter(); - - let stream = match self.tls_acceptor { - Some(ref acceptor) => { - // TODO: accept in a separate task as the handshake might block the WS listener for multiple seconds - match timeout(tls_timeout_duration, acceptor.accept(stream)).await { - Ok(Ok(tls_stream)) => { - debug!("TLS handshake successful"); - listener_events.metrics.set_accepted_with_tls(); - CustomStream::TcpWithTls(tls_stream) - }, - Ok(Err(e)) => { - error!("TLS handshake failed: {:?}", e); - continue; - }, - Err(e) => { - warn!("Accepting TLS connection timed out: {:?}", e); - continue; + let tls_acceptor = self.tls_acceptor.clone(); + let tls_acceptor_tx_cl = tls_acceptor_tx.clone(); + + tokio::spawn(async move { + let mut listener_events = ListenerEvents::new( + Some(EventsReference::ClientId(current_client_id)), + EventsCollectionType::NewClientConnection, + ListenerEventsMetrics::default(), + ); + listener_events.metrics.set_received_request(); + + let span = span!( + Level::INFO, + "handle_client_connection", + client_addr = ?client_addr, + client_id = current_client_id + ); + let _guard = span.enter(); + + let res = match tls_acceptor { + Some(ref acceptor) => { + match timeout(tls_timeout_duration, acceptor.accept(stream)).await { + Ok(Ok(tls_stream)) => { + debug!("TLS handshake successful"); + listener_events.metrics.set_accepted_with_tls(); + Ok((current_client_id, CustomStream::TcpWithTls(tls_stream), listener_events, span.clone())) + }, + Ok(Err(e)) => { + Err(format!("TLS handshake failed: {:?}", e)) + + }, + Err(e) => { + Err(format!("Accepting TLS connection timed out: {:?}", e)) + + } } - } - }, - None => { - listener_events.metrics.set_accepted_without_tls(); - CustomStream::Tcp(stream) - }, - }; - - self.start_connection_handler(stream, current_client_id, child_token.clone(), span.clone()); - - listener_events.metrics.set_started_handler(); - self.events_channel_tx.send(Box::new(listener_events)).await.expect("analyzer's side of the channel dropped") + }, + None => { + listener_events.metrics.set_accepted_without_tls(); + Ok((current_client_id, CustomStream::Tcp(stream), listener_events, span.clone())) + }, + }; + tls_acceptor_tx_cl.send(res).await.expect("ws listener's side of the channel dropped"); + }); } else { warn!("Ignoring incoming connection due to rate limiting policy"); } + }, + Some(tls_acceptor_result) = tls_acceptor_rx.recv() => { + match tls_acceptor_result { + Ok((current_client_id , stream, mut listener_events, span)) => { + self.start_connection_handler(stream, current_client_id, child_token.clone(), span); + listener_events.metrics.set_started_handler(); + self.events_channel_tx.send(Box::new(listener_events)).await.expect("analyzer's side of the channel dropped") + }, + Err(e) => error!("{:?}", e) + } } } } From ac99ec489bc86d39b027d604534e385fd2a1a81b Mon Sep 17 00:00:00 2001 From: massimoalbarello Date: Fri, 3 Nov 2023 17:45:59 +0100 Subject: [PATCH 2/5] chore: refactored accept connection of ws listener --- src/ic-websocket-gateway/src/ws_listener.rs | 113 +++++++++++--------- 1 file changed, 65 insertions(+), 48 deletions(-) diff --git a/src/ic-websocket-gateway/src/ws_listener.rs b/src/ic-websocket-gateway/src/ws_listener.rs index 0b937d7..4fda8bc 100644 --- a/src/ic-websocket-gateway/src/ws_listener.rs +++ b/src/ic-websocket-gateway/src/ws_listener.rs @@ -19,7 +19,7 @@ use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, span, warn, Instrument, Level, Span}; /// Possible TCP streams. -enum CustomStream { +pub enum CustomStream { Tcp(TcpStream), TcpWithTls(TlsStream), } @@ -101,7 +101,6 @@ impl WsListener { ) = mpsc::channel(1000); let mut limiting_rate: f64 = 0.0; - let tls_timeout_duration = Duration::from_secs(10); loop { select! { // bias select! to check token cancellation first @@ -126,53 +125,14 @@ impl WsListener { } Ok((stream, client_addr)) = self.listener.accept() => { if !is_in_rate_limit(limiting_rate) { - let current_client_id = self.next_client_id; + accept_connection( + self.next_client_id, + client_addr, + stream, + self.tls_acceptor.clone(), + tls_acceptor_tx.clone() + ); self.next_client_id += 1; - - let tls_acceptor = self.tls_acceptor.clone(); - let tls_acceptor_tx_cl = tls_acceptor_tx.clone(); - - tokio::spawn(async move { - let mut listener_events = ListenerEvents::new( - Some(EventsReference::ClientId(current_client_id)), - EventsCollectionType::NewClientConnection, - ListenerEventsMetrics::default(), - ); - listener_events.metrics.set_received_request(); - - let span = span!( - Level::INFO, - "handle_client_connection", - client_addr = ?client_addr, - client_id = current_client_id - ); - let _guard = span.enter(); - - let res = match tls_acceptor { - Some(ref acceptor) => { - match timeout(tls_timeout_duration, acceptor.accept(stream)).await { - Ok(Ok(tls_stream)) => { - debug!("TLS handshake successful"); - listener_events.metrics.set_accepted_with_tls(); - Ok((current_client_id, CustomStream::TcpWithTls(tls_stream), listener_events, span.clone())) - }, - Ok(Err(e)) => { - Err(format!("TLS handshake failed: {:?}", e)) - - }, - Err(e) => { - Err(format!("Accepting TLS connection timed out: {:?}", e)) - - } - } - }, - None => { - listener_events.metrics.set_accepted_without_tls(); - Ok((current_client_id, CustomStream::Tcp(stream), listener_events, span.clone())) - }, - }; - tls_acceptor_tx_cl.send(res).await.expect("ws listener's side of the channel dropped"); - }); } else { warn!("Ignoring incoming connection due to rate limiting policy"); } @@ -242,3 +202,60 @@ fn is_in_rate_limit(limiting_rate: f64) -> bool { random_value < limiting_rate } + +pub fn accept_connection( + current_client_id: u64, + client_addr: std::net::SocketAddr, + stream: TcpStream, + tls_acceptor: Option, + tls_acceptor_tx: Sender>, +) { + tokio::spawn(async move { + let mut listener_events = ListenerEvents::new( + Some(EventsReference::ClientId(current_client_id)), + EventsCollectionType::NewClientConnection, + ListenerEventsMetrics::default(), + ); + listener_events.metrics.set_received_request(); + + let span = span!( + Level::INFO, + "handle_client_connection", + client_addr = ?client_addr, + client_id = current_client_id + ); + let _guard = span.enter(); + + let res = match tls_acceptor { + Some(ref acceptor) => { + match timeout(Duration::from_secs(10), acceptor.accept(stream)).await { + Ok(Ok(tls_stream)) => { + debug!("TLS handshake successful"); + listener_events.metrics.set_accepted_with_tls(); + Ok(( + current_client_id, + CustomStream::TcpWithTls(tls_stream), + listener_events, + span.clone(), + )) + }, + Ok(Err(e)) => Err(format!("TLS handshake failed: {:?}", e)), + Err(e) => Err(format!("Accepting TLS connection timed out: {:?}", e)), + } + }, + None => { + listener_events.metrics.set_accepted_without_tls(); + Ok(( + current_client_id, + CustomStream::Tcp(stream), + listener_events, + span.clone(), + )) + }, + }; + tls_acceptor_tx + .send(res) + .await + .expect("ws listener's side of the channel dropped"); + }); +} From 5eb2f43e5cd8c7ff9f454a074bb0d37d7d488834 Mon Sep 17 00:00:00 2001 From: Luca8991 Date: Tue, 7 Nov 2023 18:25:13 +0100 Subject: [PATCH 3/5] chore: github action publish docker image to hub --- .github/workflows/publish.yml | 36 +++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) create mode 100644 .github/workflows/publish.yml diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml new file mode 100644 index 0000000..17b78f1 --- /dev/null +++ b/.github/workflows/publish.yml @@ -0,0 +1,36 @@ +name: Publish Docker image + +on: + release: + types: [published] + +jobs: + push_to_registry: + name: Push Docker image to Docker Hub + runs-on: ubuntu-latest + steps: + - name: Check out the repo + uses: actions/checkout@v4 + + - name: Log in to Docker Hub + uses: docker/login-action@v3 + with: + username: ${{ secrets.DOCKERHUB_USERNAME }} + password: ${{ secrets.DOCKERHUB_TOKEN }} + + - name: Extract metadata (tags, labels) for Docker + id: meta + uses: docker/metadata-action@v5 + with: + images: omniadevs/ic-websocket-gateway + + - name: Build and push Docker image + uses: docker/build-push-action@v5 + with: + context: . + file: ./Dockerfile + push: true + tags: ${{ steps.meta.outputs.tags }} + labels: ${{ steps.meta.outputs.labels }} + cache-from: type=gha + cache-to: type=gha,mode=max From 9f63c6a06cbc21998c38c464994fa98a2bf7233b Mon Sep 17 00:00:00 2001 From: Luca8991 Date: Tue, 7 Nov 2023 18:30:19 +0100 Subject: [PATCH 4/5] chore: image from hub in docker compose --- docker-compose.yml | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 660d75d..4cc4f9f 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,9 +1,6 @@ services: ic_websocket_gateway: - build: - context: ./ - dockerfile: Dockerfile - image: ic-websocket-gateway + image: omniadevs/ic-websocket-gateway:latest container_name: ic-websocket-gateway restart: unless-stopped ports: From 5b88adb895acdd5a527874a96672fb0932c26d2c Mon Sep 17 00:00:00 2001 From: Luca8991 Date: Tue, 7 Nov 2023 18:30:46 +0100 Subject: [PATCH 5/5] chore: docker hub image link --- README.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/README.md b/README.md index 0e11987..9bbefc1 100644 --- a/README.md +++ b/README.md @@ -43,6 +43,10 @@ There are some command line arguments that you can set when running the gateway: A [Dockerfile](./Dockerfile) is provided, together with a [docker-compose.yml](./docker-compose.yml) file to run the gateway. Make sure you have [Docker](https://docs.docker.com/get-docker/) and [Docker Compose](https://docs.docker.com/compose/install/) installed. +A Docker image is also available at [omniadevs/ic-websocket-gateway](https://hub.docker.com/r/omniadevs/ic-websocket-gateway). + +Steps to run the gateway with Docker: + 1. Set the environment variables: ```