diff --git a/docker/Dockerfile.core b/docker/Dockerfile.core index 2747b3351..484735d30 100644 --- a/docker/Dockerfile.core +++ b/docker/Dockerfile.core @@ -1,4 +1,4 @@ -FROM python:3.7-alpine3.14 AS build_shared +FROM python:3.10-alpine3.14 AS build_shared WORKDIR /build_shared/ @@ -6,9 +6,18 @@ RUN pip install --no-cache-dir build COPY ./src/shared/. . RUN python -m build +RUN apk add --no-cache \ + gcc \ + build-base\ + libc-dev\ + linux-headers + +COPY ./src/core/sse/forward.c . + +RUN gcc -o forward forward.c -FROM python:3.7-alpine3.14 AS production +FROM python:3.10-alpine3.14 AS production WORKDIR /app/ @@ -36,6 +45,10 @@ RUN \ apk add --no-cache --virtual .build-deps \ gcc \ g++ \ + build-base\ + libc-dev\ + zlib-dev \ + linux-headers \ make \ glib-dev \ musl-dev \ @@ -45,6 +58,9 @@ RUN \ pip install --no-cache-dir -r /app/requirements.txt && \ apk --purge del .build-deps + +COPY --from=build_shared /build_shared/forward /usr/local/bin/forward + COPY ./docker/start.sh /start.sh RUN chmod +x /start.sh diff --git a/docker/prestart_core.sh b/docker/prestart_core.sh index 1b50c5cac..cab6d2f25 100644 --- a/docker/prestart_core.sh +++ b/docker/prestart_core.sh @@ -2,6 +2,9 @@ echo "Running inside /app/prestart.sh..." +echo "Running sse forward in the background..." +/usr/local/bin/forward --sender-port 5000 --client-port 5001 & + echo "Running migrations..." /app/db_migration.py db upgrade head diff --git a/src/core/api/__init__.py b/src/core/api/__init__.py index c99c52b0d..f5b2a8983 100644 --- a/src/core/api/__init__.py +++ b/src/core/api/__init__.py @@ -1,2 +1,2 @@ __all__ = ['collectors', 'assess', 'auth', 'config', 'analyze', 'publish', - 'user', 'assets', 'bots', 'isalive', 'remote', 'dashboard'] + 'user', 'assets', 'bots', 'isalive', 'remote', 'dashboard', 'sse'] diff --git a/src/core/api/analyze.py b/src/core/api/analyze.py index 7c9ee6d26..101cf77a8 100644 --- a/src/core/api/analyze.py +++ b/src/core/api/analyze.py @@ -2,7 +2,8 @@ from flask import request, jsonify, send_file from flask_restful import Resource -from managers import asset_manager, auth_manager, sse_manager, log_manager +from managers import asset_manager, auth_manager, log_manager +from managers.sse_manager import sse_manager from managers.auth_manager import auth_required, ACLCheck from model import attribute, report_item, report_item_type from model.permission import Permission diff --git a/src/core/api/assess.py b/src/core/api/assess.py index 897ed3a86..20d015ced 100644 --- a/src/core/api/assess.py +++ b/src/core/api/assess.py @@ -2,7 +2,8 @@ from flask import request, send_file from flask_restful import Resource -from managers import auth_manager, sse_manager, log_manager +from managers import auth_manager, log_manager +from managers.sse_manager import sse_manager from managers.auth_manager import ACLCheck, auth_required from model import news_item, osint_source from model.permission import Permission diff --git a/src/core/api/bots.py b/src/core/api/bots.py index 2388941c5..577f0f25c 100644 --- a/src/core/api/bots.py +++ b/src/core/api/bots.py @@ -1,7 +1,8 @@ from flask import request from flask_restful import Resource, reqparse -from managers import sse_manager, log_manager +from managers import log_manager +from managers.sse_manager import sse_manager from managers.auth_manager import api_key_required from model import bot_preset, news_item, word_list diff --git a/src/core/api/collectors.py b/src/core/api/collectors.py index 2481a2118..c4018f134 100644 --- a/src/core/api/collectors.py +++ b/src/core/api/collectors.py @@ -1,7 +1,7 @@ from flask import request from flask_restful import Resource, reqparse -from managers import sse_manager +from managers.sse_manager import sse_manager from managers.auth_manager import api_key_required from managers.log_manager import log_debug from model import osint_source, collectors_node, news_item diff --git a/src/core/api/config.py b/src/core/api/config.py index 76596b211..9cb65c530 100644 --- a/src/core/api/config.py +++ b/src/core/api/config.py @@ -3,8 +3,9 @@ from flask import request, send_file from flask_restful import Resource -from managers import auth_manager, sse_manager, remote_manager, presenters_manager, \ +from managers import auth_manager, remote_manager, presenters_manager, \ publishers_manager, bots_manager, external_auth_manager, log_manager, collectors_manager +from managers.sse_manager import sse_manager from managers.auth_manager import auth_required, get_user_from_jwt from model import acl_entry, remote, presenters_node, publisher_preset, publishers_node, \ bots_node, bot_preset, attribute, collectors_node, organization, osint_source, product_type, \ diff --git a/src/core/api/sse.py b/src/core/api/sse.py new file mode 100644 index 000000000..2c00d148d --- /dev/null +++ b/src/core/api/sse.py @@ -0,0 +1,80 @@ +from flask_restful import Resource +import socket +import json +from flask import Response, stream_with_context, Flask +from flask import request, abort +from managers import auth_manager, bots_manager, time_manager, remote_manager + + +class TaranisSSE(Resource): + + @stream_with_context + def stream(self): + try: + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.settimeout(60) + except Exception as e: + print(f"Failed to create socket: {e}") + return + + try: + sock.connect(('localhost', 5001)) + sock.settimeout(1) + except Exception as e: + print(f"Failed to connect to server: {e}") + sock.close() + return + + buffer = "" + try: + data = b"" + while True: + try: + b = sock.recv(1) + except socket.timeout: + yield ":\n\n" + continue + + if not b: + break + data += b + try: + char = data.decode('utf-8') + data = b"" + except UnicodeDecodeError: + yield ":\n\n" + continue + + buffer += char + + if char == '}': + try: + json_data = json.loads(buffer) + except json.JSONDecodeError: + continue + + yield f"event: {json_data['event']}\ndata: {json.dumps(json_data['data'])}\n\n" + buffer = "" + except Exception as e: + print(f"Error during data reception or processing: {e}") + finally: + sock.close() + + def get(self): + try: + if request.args.get('jwt') is not None: + if auth_manager.decode_user_from_jwt(request.args.get('jwt')) is None: + return "", 403 + elif request.args.get('api_key') is not None: + if bots_manager.verify_api_key(request.args.get('api_key')) is False: + return "", 403 + else: + return "", 403 + + return Response(self.stream(), mimetype="text/event-stream") + except Exception as e: + return str(e), 500 + + +def initialize(api): + api.add_resource(TaranisSSE, "/sse") diff --git a/src/core/app.py b/src/core/app.py index 5223a52d6..ba6f2475c 100644 --- a/src/core/app.py +++ b/src/core/app.py @@ -5,6 +5,7 @@ from model import * # just until all new model classes are used regularly + def create_app(): app = Flask(__name__) app.config.from_object('config.Config') @@ -17,7 +18,6 @@ def create_app(): auth_manager.initialize(app) api_manager.initialize(app) - sse_manager.initialize(app) remote_manager.initialize(app) tagcloud_manager.initialize(app) diff --git a/src/core/managers/api_manager.py b/src/core/managers/api_manager.py index a8b7e765f..fc43ec116 100644 --- a/src/core/managers/api_manager.py +++ b/src/core/managers/api_manager.py @@ -1,5 +1,4 @@ from flask_restful import Api - from api import * @@ -11,6 +10,7 @@ def initialize(app): collectors.initialize(api) isalive.initialize(api) config.initialize(api) + sse.initialize(api) analyze.initialize(api) publish.initialize(api) user.initialize(api) diff --git a/src/core/managers/remote_manager.py b/src/core/managers/remote_manager.py index 304da82a0..576807f08 100644 --- a/src/core/managers/remote_manager.py +++ b/src/core/managers/remote_manager.py @@ -52,7 +52,7 @@ def run(self): {'last_sync_time': data['last_sync_time']}) with EventThread.app.app_context(): - sse_manager.news_items_updated() + sse_manager.sse_manager.news_items_updated() elif event.event == 'remote_access_report_items_updated': if self.remote_node.sync_report_items: @@ -68,7 +68,7 @@ def run(self): {'last_sync_time': data['last_sync_time']}) with EventThread.app.app_context(): - sse_manager.report_items_updated() + sse_manager.sse_manager.report_items_updated() except Exception: time.sleep(5) diff --git a/src/core/managers/sse_manager.py b/src/core/managers/sse_manager.py index 4947fa507..831d0307b 100644 --- a/src/core/managers/sse_manager.py +++ b/src/core/managers/sse_manager.py @@ -1,113 +1,80 @@ -from flask import request, abort -from flask_sse import sse +from sse.sse import SSE from datetime import datetime -import os from managers import auth_manager, bots_manager, time_manager, remote_manager from model.remote import RemoteAccess -def use_sse(): - return os.getenv("REDIS_URL") is not None +class SSEManager: + def __init__(self): + self.report_item_locks: dict = {} + self.sse = SSE() + self.report_item_locks_last_check_time = datetime.now() + def news_items_updated(self): + self.sse.publish({}, event='news-items-updated') -def news_items_updated(): - if use_sse(): - sse.publish({}, type='news-items-updated') + def report_items_updated(self): + self.sse.publish({}, event='report-items-updated') + def report_item_updated(self, data): + self.sse.publish(data, event='report-item-updated') -def report_items_updated(): - if use_sse(): - sse.publish({}, type='report-items-updated') + def remote_access_disconnect(self, data): + self.sse.publish(data, event='remote_access_disconnect', channel='remote') - -def report_item_updated(data): - if use_sse(): - sse.publish(data, type='report-item-updated') - - -def remote_access_disconnect(data): - if use_sse(): - sse.publish(data, type='remote_access_disconnect', channel='remote') - - -def remote_access_news_items_updated(osint_source_ids): - if use_sse(): + def remote_access_news_items_updated(self, osint_source_ids): remote_access_event_ids = RemoteAccess.get_relevant_for_news_items(osint_source_ids) - sse.publish(remote_access_event_ids, type='remote_access_news_items_updated', channel='remote') - + self.sse.publish(remote_access_event_ids, event='remote_access_news_items_updated', channel='remote') -def remote_access_report_items_updated(report_item_type_id): - if use_sse(): + def remote_access_report_items_updated(self, report_item_type_id): remote_access_event_ids = RemoteAccess.get_relevant_for_report_item(report_item_type_id) - sse.publish(remote_access_event_ids, type='remote_access_report_items_updated', channel='remote') - - -report_item_locks = {} -report_item_locks_last_check_time = datetime.now() - - -@sse.before_request -def connect(): - if request.args.get('jwt') is not None: - if auth_manager.decode_user_from_jwt(request.args.get('jwt')) is None: - abort(403) - elif request.args.get('api_key') is not None: - if bots_manager.verify_api_key(request.args.get('api_key')) is False: - abort(403) - elif request.args.get('access_key') is not None and request.args.get('channel') is not None: - if request.args.get('channel') == 'remote': - if remote_manager.verify_access_key(request.args.get('access_key')) is False: - abort(403) - else: - abort(403) - - -def report_item_lock(report_item_id, field_id, user_id): - if report_item_id in report_item_locks: - report_item = report_item_locks[report_item_id] - else: - report_item = {} - report_item_locks[report_item_id] = report_item + self.sse.publish(remote_access_event_ids, event='remote_access_report_items_updated', channel='remote') - if field_id not in report_item or report_item[field_id] is None: - report_item[field_id] = {'user_id': user_id, 'lock_time': datetime.now()} - sse.publish({'report_item_id': int(report_item_id), 'field_id': field_id, 'user_id': user_id}, - type='report-item-locked') + def report_item_lock(self, report_item_id, field_id, user_id): + if report_item_id in self.report_item_locks: + report_item = self.report_item_locks[report_item_id] + else: + report_item = {} + self.report_item_locks[report_item_id] = report_item + if field_id not in report_item or report_item[field_id] is None: + report_item[field_id] = {'user_id': user_id, 'lock_time': datetime.now()} + self.sse.publish({'report_item_id': int(report_item_id), 'field_id': field_id, 'user_id': user_id}, + event='report-item-locked') -def report_item_unlock(report_item_id, field_id, user_id): - if report_item_id in report_item_locks: - report_item = report_item_locks[report_item_id] + def report_item_unlock(self, report_item_id, field_id, user_id): + if report_item_id in self.report_item_locks: + report_item = self.report_item_locks[report_item_id] - if field_id in report_item: - report_item[field_id] = None + if field_id in report_item: + report_item[field_id] = None - sse.publish({'report_item_id': int(report_item_id), 'field_id': field_id, 'user_id': user_id}, - type='report-item-unlocked') + self.sse.publish({'report_item_id': int(report_item_id), 'field_id': field_id, 'user_id': user_id}, + event='report-item-unlocked') + def report_item_hold_lock(self, report_item_id, field_id, user_id): + if report_item_id in self.report_item_locks: + report_item = self.report_item_locks[report_item_id] + if field_id in report_item and report_item[field_id] is not None: + if report_item[field_id]['user_id'] == user_id: + report_item[field_id]['lock_time'] = datetime.now() -def report_item_hold_lock(report_item_id, field_id, user_id): - if report_item_id in report_item_locks: - report_item = report_item_locks[report_item_id] - if field_id in report_item and report_item[field_id] is not None: - if report_item[field_id]['user_id'] == user_id: - report_item[field_id]['lock_time'] = datetime.now() + def check_report_item_locks(self, app): + for key in self.report_item_locks: + for field_key in self.report_item_locks[key]: + if self.report_item_locks[key][field_key] is not None: + if self.report_item_locks[key][field_key]['lock_time'] < self.report_item_locks_last_check_time: + self.report_item_locks[key][field_key] = None + with app.app_context(): + self.sse.publish({'report_item_id': int(key), 'field_id': field_key, 'user_id': -1}, + event='report-item-unlocked') + self.report_item_locks_last_check_time = datetime.now() -def check_report_item_locks(app): - global report_item_locks_last_check_time - for key in report_item_locks: - for field_key in report_item_locks[key]: - if report_item_locks[key][field_key] is not None: - if report_item_locks[key][field_key]['lock_time'] < report_item_locks_last_check_time: - report_item_locks[key][field_key] = None - with app.app_context(): - sse.publish({'report_item_id': int(key), 'field_id': field_key, 'user_id': -1}, - type='report-item-unlocked') - report_item_locks_last_check_time = datetime.now() +sse_manager = SSEManager() def initialize(app): - time_manager.schedule_job_minutes(1, check_report_item_locks, app) + time_manager.schedule_job_minutes(1, sse_manager.check_report_item_locks, app) diff --git a/src/core/requirements.txt b/src/core/requirements.txt index 5784a289b..f19f9fa9f 100644 --- a/src/core/requirements.txt +++ b/src/core/requirements.txt @@ -1,13 +1,12 @@ alembic==1.10.2 certifi==2019.11.28 Flask==1.1.4 -Flask-Cors==3.0.10 -Flask-JWT-Extended==3.24.1 +Flask-Cors==4.0.1 +Flask-JWT-Extended==3.25.1 Flask-Migrate==2.5.2 flask-oidc==1.4.0 -Flask-RESTful==0.3.7 +Flask-RESTful==0.3.10 Flask-Script==2.0.6 -Flask-SSE==0.2.1 Flask-SQLAlchemy==2.5.1 gevent==21.8.0 greenlet==1.1.1 @@ -16,7 +15,7 @@ idna==2.9 Jinja2==2.11.3 ldap3==2.9.1 Mako==1.1.0 -MarkupSafe==1.1.0 +MarkupSafe==1.1.1 marshmallow==3.18.0 marshmallow-enum==1.5.1 psycogreen==1.0.2 @@ -32,7 +31,7 @@ schedule==0.6.0 six==1.13.0 sseclient-py==1.7 soupsieve==1.9.5 -SQLAlchemy==1.4.47 +SQLAlchemy==1.4.52 urllib3==1.26.7 -Werkzeug==0.16.0 +Werkzeug==1.0.1 pycryptodomex==3.17 diff --git a/src/core/run.py b/src/core/run.py index f4dfb7ed5..dd7c0af77 100755 --- a/src/core/run.py +++ b/src/core/run.py @@ -6,9 +6,7 @@ from psycogreen import gevent as g g.patch_psycopg() -from flask_sse import sse from app import create_app app = create_app() -app.register_blueprint(sse, url_prefix='/sse') diff --git a/src/core/sse/__init__.py b/src/core/sse/__init__.py new file mode 100644 index 000000000..417f88cc1 --- /dev/null +++ b/src/core/sse/__init__.py @@ -0,0 +1 @@ +__all__ = ['sse'] diff --git a/src/core/sse/forward.c b/src/core/sse/forward.c new file mode 100644 index 000000000..ca6befae6 --- /dev/null +++ b/src/core/sse/forward.c @@ -0,0 +1,458 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define PEER_POOL_INCREMENT 1024 +#define DEFAULT_MESSAGE_SIZE 512 + +// this must be much higher than the expected message size, because clients may not be +// scheduled before multiple messages are received and buffered for transmission. +#define CLIENT_BUFFER_SIZE 16384 +#define PEER_TYPE_SENDER 0 +#define PEER_TYPE_CLIENT 1 + +#define PORT_SENDER 50000 +#define PORT_CLIENT 50001 + +struct client_s { + char circular_buffer[CLIENT_BUFFER_SIZE]; + int head; // append end + int tail; // read end +}; + +struct sender_s { + char *message; + int msg_len; + int msg_max_len; +}; + +struct peer_s { + int fd; + int type; + union { + struct client_s client; + struct sender_s sender; + } extra; +}; + +static int n_peers; +static int max_peers; +static struct peer_s *peers; +static struct pollfd *fds; +static int verbose_level; // 1-nothing 2-malloc 3-input 4-parser 5-all + +int will_hangup(int idx) { + return (peers[idx].fd == -1); +} + +void prepare_for_hangup(int idx) { + shutdown(peers[idx].fd, 2); + close(peers[idx].fd); + peers[idx].fd = -1; +} + +void hangup_peer(int index) { + if (peers[index].type == PEER_TYPE_SENDER) { + if (peers[index].extra.sender.message) { + free(peers[index].extra.sender.message); + } + } + peers[index] = peers[n_peers - 1]; + fds[index] = fds[n_peers - 1]; + n_peers--; +} + +void hangup_peers() { + for (int i = n_peers - 1; i >= 2; i--) { + if (will_hangup(i)) { + hangup_peer(i); + } + } +} + +int setup_server_socket(int port) { + int sockfd = socket(AF_INET, SOCK_STREAM, 0); + if (sockfd < 0) { + perror("Socket creation failed"); + exit(EXIT_FAILURE); + } + + int opt = 1; + setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)); + + struct sockaddr_in addr; + addr.sin_family = AF_INET; + addr.sin_addr.s_addr = INADDR_ANY; + addr.sin_port = htons(port); + + if (bind(sockfd, (struct sockaddr *) &addr, sizeof(addr)) < 0) { + perror("Socket bind failed"); + exit(EXIT_FAILURE); + } + + if (listen(sockfd, 10) < 0) { + perror("Socket listen failed"); + exit(EXIT_FAILURE); + } + + return sockfd; +} + +int accept_peer(int listenfd, int peer_type) { + if (max_peers == n_peers) { + struct peer_s *peers_realloc; + struct pollfd *fds_realloc; + + peers_realloc = realloc(peers, sizeof(struct peer_s) * (max_peers + PEER_POOL_INCREMENT)); + fds_realloc = realloc(fds, sizeof(struct pollfd) * (max_peers + PEER_POOL_INCREMENT)); + if (peers_realloc) + peers = peers_realloc; + if (fds_realloc) + fds = fds_realloc; + if (!peers_realloc || !fds_realloc) + return -1; + max_peers += PEER_POOL_INCREMENT; + } + + bzero(&peers[n_peers], sizeof(struct peer_s)); + peers[n_peers].fd = accept(listenfd, NULL, NULL); + if (peers[n_peers].fd == -1) { + perror("Accept failed"); + return -1; + } + peers[n_peers].type = peer_type; + fds[n_peers].fd = peers[n_peers].fd; + + if (peer_type == PEER_TYPE_SENDER) { + char *new_buf = malloc(DEFAULT_MESSAGE_SIZE); + if (verbose_level == 2 || verbose_level == 5) + printf("Sender peer %d fd %d malloc %d = %p\n", n_peers, peers[n_peers].fd, DEFAULT_MESSAGE_SIZE, new_buf); + if (new_buf == NULL) { + perror("Malloc failed"); + shutdown(peers[n_peers].fd, 2); + close(peers[n_peers].fd); + return -1; + } + peers[n_peers].extra.sender.message = new_buf; + peers[n_peers].extra.sender.msg_len = 0; + peers[n_peers].extra.sender.msg_max_len = DEFAULT_MESSAGE_SIZE; + fds[n_peers].events = POLLIN; + } else { + fds[n_peers].events = POLLIN; + } + fds[n_peers].revents = 0; + return n_peers++; +} + +int append_to_peer(int index, char *buf, int len) { + if (peers[index].type != PEER_TYPE_CLIENT) + return -1; + + struct client_s *client = &peers[index].extra.client; + + int space_remaining = (client->tail - client->head + CLIENT_BUFFER_SIZE - 1) % CLIENT_BUFFER_SIZE; + int space_until_wrap = CLIENT_BUFFER_SIZE - client->head; + + if (space_remaining < len) { + // cannot append the message for the client; kick it out + return -1; + } + if (len <= space_until_wrap) { + memcpy(client->circular_buffer + client->head, buf, len); + } else { + memcpy(client->circular_buffer + client->head, buf, space_until_wrap); + memcpy(client->circular_buffer, buf + space_until_wrap, len - space_until_wrap); + } + client->head = (client->head + len) % CLIENT_BUFFER_SIZE; + return 0; +} + +int receive_jsons(int idx) { + char *new_message_space; + int jsons_received = 0; + + if (peers[idx].extra.sender.msg_len == peers[idx].extra.sender.msg_max_len) { + new_message_space = realloc(peers[idx].extra.sender.message, + peers[idx].extra.sender.msg_max_len + DEFAULT_MESSAGE_SIZE); + if (new_message_space == NULL) { + perror("Cannot resize sender buffer"); + return -1; + } + peers[idx].extra.sender.message = new_message_space; + peers[idx].extra.sender.msg_max_len += DEFAULT_MESSAGE_SIZE; + } + + if (verbose_level == 3 || verbose_level == 5) + printf("From client %d reading %d bytes frk %p(originally %p)\n", idx, + peers[idx].extra.sender.msg_max_len - peers[idx].extra.sender.msg_len, + peers[idx].extra.sender.message + peers[idx].extra.sender.msg_len, peers[idx].extra.sender.message); + int len = read(peers[idx].fd, + peers[idx].extra.sender.message + peers[idx].extra.sender.msg_len, + peers[idx].extra.sender.msg_max_len - peers[idx].extra.sender.msg_len); + if (verbose_level == 3 || verbose_level == 5) printf("Reads %d bytes\n", len); + + if (len <= 0) { + // remote side closed connection + if (verbose_level == 3 || verbose_level == 5) + printf("Preparing to hang client %d fd %d, due to len=0\n", idx, peers[idx].fd); + prepare_for_hangup(idx); + return -1; + } + peers[idx].extra.sender.msg_len += len; + + while (1) { + int brackets = 0; + int in_string = 0; /* 0, 1, 2 */ + int i; + for (i = 0; i < peers[idx].extra.sender.msg_len; i++) { + if (verbose_level == 4 || verbose_level == 5) + printf("Processing character [%c], in_string:%d, i:%d/%d, brackets:%d\n", + (peers[idx].extra.sender.message[i] >= 32 && peers[idx].extra.sender.message[i] < 127) + ? peers[idx].extra.sender.message[i] : '.', in_string, i, peers[idx].extra.sender.msg_len, + brackets); + if (in_string) { + if (in_string == 2) { + if (verbose_level == 4 || verbose_level == 5) printf("Ignore this character\n"); + in_string = 1; + continue; + } + if (peers[idx].extra.sender.message[i] == '"') { + if (verbose_level == 4 || verbose_level == 5) printf("Ending quotation marks\n"); + in_string = 0; + continue; + } + if (peers[idx].extra.sender.message[i] == '\\') { + if (verbose_level == 4 || verbose_level == 5) printf("Beginning of backslash, ignore rest\n"); + in_string = 2; + continue; + } + continue; + } + if (verbose_level == 4 || verbose_level == 5) printf("Not in string\n"); + if (peers[idx].extra.sender.message[i] == '{') { + if (verbose_level == 4 || verbose_level == 5) printf("Start of bracket\n"); + brackets++; + } else if (peers[idx].extra.sender.message[i] == '}') { + if (verbose_level == 4 || verbose_level == 5) printf("End of bracket\n"); + brackets--; + if (brackets <= 0) { + if (verbose_level == 4 || verbose_level == 5) printf("Last bracket was send\n"); + break; + } + } else if (peers[idx].extra.sender.message[i] == '"') { + if (verbose_level == 4 || verbose_level == 5) printf("Start of string\n"); + in_string = 1; + continue; + } + } + if (verbose_level == 4 || verbose_level == 5) printf("==== END OF PROCESSING ====\n"); + if (i == peers[idx].extra.sender.msg_len) { + if (verbose_level == 4 || verbose_level == 5) printf("Message not found\n"); + // no new messages + break; + } + // we have a complete JSON to send out; store it in the output buffers + jsons_received++; + + for (int j = 2; j < n_peers; j++) { + if (peers[j].type == PEER_TYPE_CLIENT) { + if (append_to_peer(j, peers[idx].extra.sender.message, i + 1) == -1) { + // kick the client, it doesn't empty its ring buffer fast enough + prepare_for_hangup(j); + } + } + } + + if (peers[idx].extra.sender.msg_len - i - 1 > 0) + memmove(peers[idx].extra.sender.message, peers[idx].extra.sender.message + i + 1, + peers[idx].extra.sender.msg_len - i - 1); + + peers[idx].extra.sender.msg_len -= i + 1; + } + return jsons_received; +} + +void do_send_to_client(int index) { + struct client_s *client; + int bytes_to_send, bytes_sent; + + if (will_hangup(index) || peers[index].type != PEER_TYPE_CLIENT) + return; + client = &peers[index].extra.client; + + if (client->tail == client->head) + return; + + if (client->head > client->tail) { // no wrap necessary + bytes_to_send = client->head - client->tail; + } else { // data wraps; let's send just the first part right now + bytes_to_send = CLIENT_BUFFER_SIZE - client->tail; + } + bytes_sent = write(peers[index].fd, + client->circular_buffer + client->tail, + bytes_to_send + ); + if (bytes_sent <= 0) { + prepare_for_hangup(index); + } else { + client->tail = (client->tail + bytes_sent) % CLIENT_BUFFER_SIZE; + } +} + +void print_help(char *self_name, int sender_port, int client_port) { + printf("Usage: %s [OPTIONS]\n", self_name); + printf("Options:\n"); + printf(" -h, --help Display this help message and quit\n"); + printf(" -v, --verbose Increase verbose level (up to 5)\n"); + printf(" -s , --sender-port Set sender port number (current value: %d)\n", sender_port); + printf(" -c , --client-port Set client port number (current value: %d)\n", client_port); +} + +void main(int argc, char *argv[]) { + verbose_level = 0; + int sender_port = 5000; + int client_port = 5001; + int opt; + + n_peers = 2; + max_peers = PEER_POOL_INCREMENT; + peers = malloc(sizeof(struct peer_s) * max_peers); + if (!peers) { + perror("Cannot allocate memory for clients"); + exit(1); + } + fds = malloc(sizeof(struct pollfd) * max_peers); + if (!fds) { + perror("Cannot allocate memory for clients (fds)"); + exit(1); + } + + // parse options + while (1) { + static struct option long_options[] = { + {"help", no_argument, 0, 'h'}, + {"verbose", optional_argument, 0, 'v'}, + {"sender-port", required_argument, 0, 's'}, + {"client-port", required_argument, 0, 'c'}, + {0, 0, 0, 0} + }; + int option_index = 0; + + opt = getopt_long(argc, argv, "hv:s:c:", long_options, &option_index); + + if (opt == -1) { + break; + } + + switch (opt) { + case 'h': + print_help(argv[0], sender_port, client_port); + exit(0); + case 'v': + if (optarg) { + verbose_level = atoi(optarg); + } else { + verbose_level = 1; + } + if (verbose_level > 5) { + verbose_level = 5; + } + break; + case 's': + sender_port = atoi(optarg); + if (sender_port < 1 || sender_port > 65535) { + fprintf(stderr, "Error: Sender port must be between 1 and 65535\n"); + exit(1); + } + break; + case 'c': + client_port = atoi(optarg); + if (client_port < 1 || client_port > 65535) { + fprintf(stderr, "Error: Client port must be between 1 and 65535\n"); + exit(1); + } + break; + default: + print_help(argv[0], sender_port, client_port); + exit(1); + } + } + + peers[0].type = -1; + peers[0].fd = setup_server_socket(sender_port); + fds[0].fd = peers[0].fd; + fds[0].events = POLLIN; + + peers[1].type = -1; + peers[1].fd = setup_server_socket(client_port); + fds[1].fd = peers[1].fd; + fds[1].events = POLLIN; + + // main loop + while (1) { + // modify poll() preferences for consumers + for (int i = 2; i < n_peers; i++) { + if (peers[i].type != PEER_TYPE_CLIENT || will_hangup(i)) + continue; + if (peers[i].extra.client.head == peers[i].extra.client.tail) + fds[i].events = POLLIN; + else + fds[i].events = POLLIN | POLLOUT; + } + int ret = poll(fds, n_peers, -1); + if (ret < 0) { + perror("Poll failed"); + sleep(1); + continue; + } + + // accept new sender + if (fds[0].revents & POLLIN) { + accept_peer(fds[0].fd, PEER_TYPE_SENDER); + } + // accept new client + if (fds[1].revents & POLLIN) { + accept_peer(fds[1].fd, PEER_TYPE_CLIENT); + } + + // handle disconnecting clients + for (int i = 2; i < n_peers; i++) { + if (will_hangup(i) || !(fds[i].revents & (POLLERR | POLLHUP | POLLNVAL))) + continue; + + prepare_for_hangup(i); + } + + // handle output first + for (int i = 2; i < n_peers; i++) { + if (will_hangup(i) || !(fds[i].revents & POLLOUT)) + continue; + do_send_to_client(i); + } + + // last, handle new inputs + for (int i = 2; i < n_peers; i++) { + if (will_hangup(i) || !(fds[i].revents & POLLIN)) + continue; + + if (peers[i].type == PEER_TYPE_CLIENT) { + char toilet[512]; + if (read(peers[i].fd, toilet, sizeof(toilet)) <= 0) + prepare_for_hangup(i); + } else { + receive_jsons(i); + } + } + + // hangup some peers + hangup_peers(); + } +} + diff --git a/src/core/sse/sse.py b/src/core/sse/sse.py new file mode 100644 index 000000000..bdf3ecb08 --- /dev/null +++ b/src/core/sse/sse.py @@ -0,0 +1,20 @@ +import socket +import json + + +class SSE: + def __init__(self): + pass + + def publish(self, data, event, channel=None): + message = self.format_sse(data, event) + try: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: + sock.connect(('localhost', 5000)) + sock.sendall(message.encode('utf-8')) + except Exception as e: + print(f"Error sending data: {e}") + + @staticmethod + def format_sse(data, event=None) -> str: + return json.dumps({"event": event, "data": data})