Skip to content

Commit

Permalink
Merge branch 'dev'
Browse files Browse the repository at this point in the history
  • Loading branch information
ilbertt committed Nov 7, 2023
2 parents 564ab94 + 5b88adb commit d3ad691
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 48 deletions.
36 changes: 36 additions & 0 deletions .github/workflows/publish.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
name: Publish Docker image

on:
release:
types: [published]

jobs:
push_to_registry:
name: Push Docker image to Docker Hub
runs-on: ubuntu-latest
steps:
- name: Check out the repo
uses: actions/checkout@v4

- name: Log in to Docker Hub
uses: docker/login-action@v3
with:
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_TOKEN }}

- name: Extract metadata (tags, labels) for Docker
id: meta
uses: docker/metadata-action@v5
with:
images: omniadevs/ic-websocket-gateway

- name: Build and push Docker image
uses: docker/build-push-action@v5
with:
context: .
file: ./Dockerfile
push: true
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
cache-from: type=gha
cache-to: type=gha,mode=max
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ There are some command line arguments that you can set when running the gateway:

A [Dockerfile](./Dockerfile) is provided, together with a [docker-compose.yml](./docker-compose.yml) file to run the gateway. Make sure you have [Docker](https://docs.docker.com/get-docker/) and [Docker Compose](https://docs.docker.com/compose/install/) installed.

A Docker image is also available at [omniadevs/ic-websocket-gateway](https://hub.docker.com/r/omniadevs/ic-websocket-gateway).

Steps to run the gateway with Docker:

1. Set the environment variables:

```
Expand Down
5 changes: 1 addition & 4 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
services:
ic_websocket_gateway:
build:
context: ./
dockerfile: Dockerfile
image: ic-websocket-gateway
image: omniadevs/ic-websocket-gateway:latest
container_name: ic-websocket-gateway
restart: unless-stopped
ports:
Expand Down
129 changes: 85 additions & 44 deletions src/ic-websocket-gateway/src/ws_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@ use std::{fs, sync::Arc, time::Duration};
use tokio::{
net::{TcpListener, TcpStream},
select,
sync::mpsc::{Receiver, Sender},
sync::mpsc::{self, Receiver, Sender},
time::timeout,
};
use tokio_native_tls::{TlsAcceptor, TlsStream};
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, span, warn, Instrument, Level, Span};

/// Possible TCP streams.
enum CustomStream {
pub enum CustomStream {
Tcp(TcpStream),
TcpWithTls(TlsStream<TcpStream>),
}
Expand Down Expand Up @@ -91,8 +91,16 @@ impl WsListener {
let wait_for_cancellation = parent_token.cancelled();
tokio::pin!(wait_for_cancellation);

// [ws listener task] [tls acceptor task]
// tls_acceptor_rx <----- tls_acceptor_tx

// channel used by the tls acceptor task to let the ws listener task know when the handshake is complete
let (tls_acceptor_tx, mut tls_acceptor_rx): (
Sender<Result<(u64, CustomStream, ListenerEvents, Span), String>>,
Receiver<Result<(u64, CustomStream, ListenerEvents, Span), String>>,
) = mpsc::channel(1000);

let mut limiting_rate: f64 = 0.0;
let tls_timeout_duration = Duration::from_secs(5);
loop {
select! {
// bias select! to check token cancellation first
Expand All @@ -117,51 +125,27 @@ impl WsListener {
}
Ok((stream, client_addr)) = self.listener.accept() => {
if !is_in_rate_limit(limiting_rate) {
let current_client_id = self.next_client_id;
self.next_client_id += 1;

let mut listener_events = ListenerEvents::new(Some(EventsReference::ClientId(current_client_id)), EventsCollectionType::NewClientConnection, ListenerEventsMetrics::default());
listener_events.metrics.set_received_request();
let span = span!(
Level::INFO,
"handle_client_connection",
client_addr = ?client_addr,
client_id = current_client_id
accept_connection(
self.next_client_id,
client_addr,
stream,
self.tls_acceptor.clone(),
tls_acceptor_tx.clone()
);
let _guard = span.enter();

let stream = match self.tls_acceptor {
Some(ref acceptor) => {
// TODO: accept in a separate task as the handshake might block the WS listener for multiple seconds
match timeout(tls_timeout_duration, acceptor.accept(stream)).await {
Ok(Ok(tls_stream)) => {
debug!("TLS handshake successful");
listener_events.metrics.set_accepted_with_tls();
CustomStream::TcpWithTls(tls_stream)
},
Ok(Err(e)) => {
error!("TLS handshake failed: {:?}", e);
continue;
},
Err(e) => {
warn!("Accepting TLS connection timed out: {:?}", e);
continue;
}
}
},
None => {
listener_events.metrics.set_accepted_without_tls();
CustomStream::Tcp(stream)
},
};

self.start_connection_handler(stream, current_client_id, child_token.clone(), span.clone());

listener_events.metrics.set_started_handler();
self.events_channel_tx.send(Box::new(listener_events)).await.expect("analyzer's side of the channel dropped")
self.next_client_id += 1;
} else {
warn!("Ignoring incoming connection due to rate limiting policy");
}
},
Some(tls_acceptor_result) = tls_acceptor_rx.recv() => {
match tls_acceptor_result {
Ok((current_client_id , stream, mut listener_events, span)) => {
self.start_connection_handler(stream, current_client_id, child_token.clone(), span);
listener_events.metrics.set_started_handler();
self.events_channel_tx.send(Box::new(listener_events)).await.expect("analyzer's side of the channel dropped")
},
Err(e) => error!("{:?}", e)
}
}
}
}
Expand Down Expand Up @@ -218,3 +202,60 @@ fn is_in_rate_limit(limiting_rate: f64) -> bool {

random_value < limiting_rate
}

pub fn accept_connection(
current_client_id: u64,
client_addr: std::net::SocketAddr,
stream: TcpStream,
tls_acceptor: Option<TlsAcceptor>,
tls_acceptor_tx: Sender<Result<(u64, CustomStream, ListenerEvents, Span), String>>,
) {
tokio::spawn(async move {
let mut listener_events = ListenerEvents::new(
Some(EventsReference::ClientId(current_client_id)),
EventsCollectionType::NewClientConnection,
ListenerEventsMetrics::default(),
);
listener_events.metrics.set_received_request();

let span = span!(
Level::INFO,
"handle_client_connection",
client_addr = ?client_addr,
client_id = current_client_id
);
let _guard = span.enter();

let res = match tls_acceptor {
Some(ref acceptor) => {
match timeout(Duration::from_secs(10), acceptor.accept(stream)).await {
Ok(Ok(tls_stream)) => {
debug!("TLS handshake successful");
listener_events.metrics.set_accepted_with_tls();
Ok((
current_client_id,
CustomStream::TcpWithTls(tls_stream),
listener_events,
span.clone(),
))
},
Ok(Err(e)) => Err(format!("TLS handshake failed: {:?}", e)),
Err(e) => Err(format!("Accepting TLS connection timed out: {:?}", e)),
}
},
None => {
listener_events.metrics.set_accepted_without_tls();
Ok((
current_client_id,
CustomStream::Tcp(stream),
listener_events,
span.clone(),
))
},
};
tls_acceptor_tx
.send(res)
.await
.expect("ws listener's side of the channel dropped");
});
}

0 comments on commit d3ad691

Please sign in to comment.