Skip to content

Commit

Permalink
add subscriber polling and dockerfile
Browse files Browse the repository at this point in the history
  • Loading branch information
maaikelimper committed Nov 10, 2023
1 parent 8673fe0 commit fbfed37
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 2 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -128,3 +128,4 @@ dmypy.json
# Pyre type checker
.pyre/
test-data/WIGOS_0-454-2-AWSCHIKANGAWA_20221109T135500.bufr4
test-config.yml
19 changes: 19 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
FROM python:3.9.17-slim-buster

ENV LOG_LEVEL=INFO

# Set the working directory to /app/pywis-pubsub
WORKDIR /app/pywis-pubsub

# first install requirements to speed up rebuilds
COPY requirements.txt /app/pywis-pubsub
RUN pip3 install -r requirements.txt

# Copy the current directory contents into the container at /app/pywis-pubsub
COPY . /app/pywis-pubsub

# install pywis-pubsub
RUN python3 setup.py install

# run pywis-pubsub subscribe
CMD pywis-pubsub subscribe --config /tmp/local.yml --verbosity $LOG_LEVEL
85 changes: 83 additions & 2 deletions pywis_pubsub/subscribe.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@

import click

import threading
import time
import os

Check failure on line 31 in pywis_pubsub/subscribe.py

View workflow job for this annotation

GitHub Actions / flake8_py3

pywis_pubsub/subscribe.py#L31

[F401]

from pywis_pubsub import cli_options
from pywis_pubsub import util
from pywis_pubsub.geometry import is_message_within_bbox
Expand All @@ -36,6 +40,7 @@
from pywis_pubsub.validation import validate_message
from pywis_pubsub.verification import data_verified

import urllib3

LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -169,6 +174,64 @@ def on_message_handler(client, userdata, msg):
LOGGER.error(msg, exc_info=True)


def subscription_polling(
client: MQTTPubSubClient,
url: str,
interval: int,
qos: int,
timeout: int = 5) -> None:
"""
Polls a subscription URL and subscribes to topics that are returned.
Unsubscribes from topics that are not returned.
args:
client: MQTTPubSubClient
Client to subscribe/unsubscribe with
url: str
URL to poll for subscriptions
interval: int
Interval in seconds to poll the URL
qos: int
QoS to subscribe/unsubscribe with
timeout: int
Timeout in seconds for the HTTP request
"""

subscriptions = []

LOGGER.info(f'Starting subscription polling with interval {interval}')
LOGGER.info(f'Polling URL: {url}')
# while-loop that checks if topics need to be updated
loop = True
while loop:
try:
http = urllib3.PoolManager()
response = http.request('GET', url, timeout=timeout)
status = response.status
if status != 200:
LOGGER.error(f"HTTP error while polling URL: {status}")
time.sleep(interval)
continue
new_subscriptions = json.loads(response.data.decode('utf-8'))
except Exception as e:
LOGGER.error(f"Error while parsing subscription URL response: {e}")
time.sleep(interval)
continue
LOGGER.info(f"Received {len(new_subscriptions)} subscriptions")
for topic in subscriptions:
LOGGER.debug(f"Checking if {topic} is still subscribed")
if topic not in new_subscriptions:
LOGGER.info(f"Unsubscribing from {topic}")
client.conn.unsubscribe(topic)
for new_topic in new_subscriptions:
LOGGER.debug(f"Checking if {new_topic} is already subscribed")
if new_topic not in subscriptions:
LOGGER.info(f"Subscribing to {new_topic}")
client.conn.subscribe(new_topic, qos=qos)
subscriptions = new_subscriptions
time.sleep(interval)


@click.command()
@click.pass_context
@cli_options.OPTION_CONFIG
Expand All @@ -178,6 +241,8 @@ def on_message_handler(client, userdata, msg):
def subscribe(ctx, config, download, bbox=[], verbosity='NOTSET'):
"""Subscribe to a broker/topic and optionally download data"""

click.echo('Starting pywis-pubsub subscribe')

if config is None:
raise click.ClickException('missing --config')
config = util.yaml_load(config)
Expand All @@ -188,7 +253,6 @@ def subscribe(ctx, config, download, bbox=[], verbosity='NOTSET'):
qos = int(config.get('qos', 1))
subscribe_topics = config.get('subscribe_topics', [])
verify_certs = config.get('verify_certs', True)

options = {
'verify_certs': verify_certs
}
Expand All @@ -208,5 +272,22 @@ def subscribe(ctx, config, download, bbox=[], verbosity='NOTSET'):
client = MQTTPubSubClient(broker, options)
client.bind('on_message', on_message_handler)
click.echo(f'Connected to broker {client.broker_safe_url}')
click.echo(f'Subscribing to subscribe_topics {subscribe_topics}')

if config.get('subscription_polling_enabled', False):
click.echo('Subscription polling enabled')
sub_url = config.get('subscription_polling_url')
sub_interval = config.get('subscription_polling_interval', 5)
click.echo(f'Subscription polling URL: {sub_url}')
click.echo(f'Subscription polling interval: {sub_interval}')
if sub_url is None:
error = 'missing subscription_polling_url in config'
raise click.ClickException(error)
# start subscription polling in separate thread
polling_thread = threading.Thread(
target=subscription_polling,
args=(client, sub_url, sub_interval, qos))
polling_thread.start()
else:
click.echo('Subscription polling disabled')
click.echo(f'Subscribing to subscribe_topics {subscribe_topics}')
client.sub(subscribe_topics, qos)

0 comments on commit fbfed37

Please sign in to comment.