diff --git a/.gitignore b/.gitignore index a71672a..26951d8 100644 --- a/.gitignore +++ b/.gitignore @@ -128,3 +128,4 @@ dmypy.json # Pyre type checker .pyre/ test-data/WIGOS_0-454-2-AWSCHIKANGAWA_20221109T135500.bufr4 +test-config.yml diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..5cb8686 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,19 @@ +FROM python:3.9.17-slim-buster + +ENV LOG_LEVEL=INFO + +# Set the working directory to /app/pywis-pubsub +WORKDIR /app/pywis-pubsub + +# first install requirements to speed up rebuilds +COPY requirements.txt /app/pywis-pubsub +RUN pip3 install -r requirements.txt + +# Copy the current directory contents into the container at /app/pywis-pubsub +COPY . /app/pywis-pubsub + +# install pywis-pubsub +RUN python3 setup.py install + +# run pywis-pubsub subscribe +CMD pywis-pubsub subscribe --config /tmp/local.yml --verbosity $LOG_LEVEL diff --git a/pywis_pubsub/subscribe.py b/pywis_pubsub/subscribe.py index 283d96a..81fb587 100644 --- a/pywis_pubsub/subscribe.py +++ b/pywis_pubsub/subscribe.py @@ -26,6 +26,10 @@ import click +import threading +import time +import os + from pywis_pubsub import cli_options from pywis_pubsub import util from pywis_pubsub.geometry import is_message_within_bbox @@ -36,6 +40,7 @@ from pywis_pubsub.validation import validate_message from pywis_pubsub.verification import data_verified +import urllib3 LOGGER = logging.getLogger(__name__) @@ -169,6 +174,64 @@ def on_message_handler(client, userdata, msg): LOGGER.error(msg, exc_info=True) +def subscription_polling( + client: MQTTPubSubClient, + url: str, + interval: int, + qos: int, + timeout: int = 5) -> None: + """ + Polls a subscription URL and subscribes to topics that are returned. + Unsubscribes from topics that are not returned. + + args: + client: MQTTPubSubClient + Client to subscribe/unsubscribe with + url: str + URL to poll for subscriptions + interval: int + Interval in seconds to poll the URL + qos: int + QoS to subscribe/unsubscribe with + timeout: int + Timeout in seconds for the HTTP request + """ + + subscriptions = [] + + LOGGER.info(f'Starting subscription polling with interval {interval}') + LOGGER.info(f'Polling URL: {url}') + # while-loop that checks if topics need to be updated + loop = True + while loop: + try: + http = urllib3.PoolManager() + response = http.request('GET', url, timeout=timeout) + status = response.status + if status != 200: + LOGGER.error(f"HTTP error while polling URL: {status}") + time.sleep(interval) + continue + new_subscriptions = json.loads(response.data.decode('utf-8')) + except Exception as e: + LOGGER.error(f"Error while parsing subscription URL response: {e}") + time.sleep(interval) + continue + LOGGER.info(f"Received {len(new_subscriptions)} subscriptions") + for topic in subscriptions: + LOGGER.debug(f"Checking if {topic} is still subscribed") + if topic not in new_subscriptions: + LOGGER.info(f"Unsubscribing from {topic}") + client.conn.unsubscribe(topic) + for new_topic in new_subscriptions: + LOGGER.debug(f"Checking if {new_topic} is already subscribed") + if new_topic not in subscriptions: + LOGGER.info(f"Subscribing to {new_topic}") + client.conn.subscribe(new_topic, qos=qos) + subscriptions = new_subscriptions + time.sleep(interval) + + @click.command() @click.pass_context @cli_options.OPTION_CONFIG @@ -178,6 +241,8 @@ def on_message_handler(client, userdata, msg): def subscribe(ctx, config, download, bbox=[], verbosity='NOTSET'): """Subscribe to a broker/topic and optionally download data""" + click.echo('Starting pywis-pubsub subscribe') + if config is None: raise click.ClickException('missing --config') config = util.yaml_load(config) @@ -188,7 +253,6 @@ def subscribe(ctx, config, download, bbox=[], verbosity='NOTSET'): qos = int(config.get('qos', 1)) subscribe_topics = config.get('subscribe_topics', []) verify_certs = config.get('verify_certs', True) - options = { 'verify_certs': verify_certs } @@ -208,5 +272,22 @@ def subscribe(ctx, config, download, bbox=[], verbosity='NOTSET'): client = MQTTPubSubClient(broker, options) client.bind('on_message', on_message_handler) click.echo(f'Connected to broker {client.broker_safe_url}') - click.echo(f'Subscribing to subscribe_topics {subscribe_topics}') + + if config.get('subscription_polling_enabled', False): + click.echo('Subscription polling enabled') + sub_url = config.get('subscription_polling_url') + sub_interval = config.get('subscription_polling_interval', 5) + click.echo(f'Subscription polling URL: {sub_url}') + click.echo(f'Subscription polling interval: {sub_interval}') + if sub_url is None: + error = 'missing subscription_polling_url in config' + raise click.ClickException(error) + # start subscription polling in separate thread + polling_thread = threading.Thread( + target=subscription_polling, + args=(client, sub_url, sub_interval, qos)) + polling_thread.start() + else: + click.echo('Subscription polling disabled') + click.echo(f'Subscribing to subscribe_topics {subscribe_topics}') client.sub(subscribe_topics, qos)