Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add capability to monitor for silence in geographic gathering #89

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 21 additions & 4 deletions doc/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,21 @@ perhaps from `trollstalker`_ or `segment-gatherer`_. Using the configured
granule duration and the area of interest, it calculates the starting times
of granules it should expect to be covered in this area before and after the
granule it was messaged about. Collection is considered finished when either
all expected granules have been collected or when a timeout is reached,
whatever comes first. Timeout is configured with the ``timeliness`` option
(see below).
of three conditions is reached:

- All expected granules have been collected.
- A timeout is reached due to the ``timeliness`` option. This timeout is
calculated based on expected *remaining* granules. That means the timeout
can change if the last granule is collected. For example, we expect
granules at times 0, 3, 6, 9, and 12. Granule duration is 3 minutes and
timeliness is 5 minutes. Initially the timeout is set at t=12+3+5=20. But
if we collect 0, 6, 9, and 12 (but not 3), then after 12 has been collected,
timeout is adjusted to 3+3+5=11. Since the granule at t=12 is probably
collected when the clock time is later than t=11, the collection of the final granule
at t=12 leads to an immediate trigger of the timeout after the collection of t=12.
- No granules are collected at all for a period of ``silence`` seconds.
Considering the previous example, if we collect 3, 6, 9, but not 12; if
silence is set to 5 minutes, then the timeout will be reached at t=9+5=14.

.. _pytroll-schedule: http://pytroll-schedule.readthedocs.org/
.. _pyorbital: https://pyorbital.readthedocs.io/en/latest/
Expand Down Expand Up @@ -183,6 +195,11 @@ timeliness
``timeliness`` minutes after the expected end time of the last expected
granule.

silence
Monitor for silence for this time (in seconds). If no messages are
received at all for this period, ship what we have regardless of other
timeouts.

And the following optional fields:

service
Expand Down Expand Up @@ -218,7 +235,7 @@ orbit_type
nameserver
Nameserver to use to publish posttroll messages.

.. literalinclude:: ../../examples/gatherer_config.ini_template
.. literalinclude:: ../../examples/geographic_gatherer_config.ini_template
:language: ini

scisys_receiver
Expand Down
3 changes: 3 additions & 0 deletions examples/geographic_gatherer_config.ini_template
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ sensor = viirs
timeliness = 10
# duration of a granule in SECONDS
duration = 180
# silence monitoring in seconds. Collection is stopped if no granules/messages
# are received for this amount of time.
# silence = 600
publish_topic =

[ears_viirs]
Expand Down
32 changes: 27 additions & 5 deletions pytroll_collectors/region_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,22 @@ class RegionCollector(object):

def __init__(self, region,
timeliness=None,
granule_duration=None):
"""Initialize the region collector."""
granule_duration=None,
silence=timedelta(days=9999)):
"""Initialize the region collector.

Args:
region (AreaDefinition): Area for which to collect granules.
timeliness (datetime.timedelta): Timeout after latest expected
granule. This timeout will be reached if the clock time
exceeds the time for the latest expected granule +
granule_duration + timeliness.
granule_duration (datetime.timedelta): Duration for a granule.
This will be used to calculate expected granules and to
estimate the timeliness-timeout (see above).
silence (datetime.timedelta): Regardless of what we expect, abort
collection if nothing has been received for this duration.
"""
self.region = region # area def
self.granule_times = set()
self.granules = []
Expand All @@ -58,6 +72,7 @@ def __init__(self, region,
self.timeout = None
self.granule_duration = granule_duration
self.last_file_added = False
self.silence = silence

def __call__(self, granule_metadata):
"""Perform the collection on the granule."""
Expand Down Expand Up @@ -146,6 +161,7 @@ def _adjust_timeout(self):
self.granule_times) +
self.granule_duration +
self.timeliness)
silence_timeout = datetime.now() + self.silence
except ValueError:
logger.error("Calculation of new timeout failed, "
"keeping previous timeout.")
Expand All @@ -155,6 +171,10 @@ def _adjust_timeout(self):
if new_timeout < self.timeout:
self.timeout = new_timeout
logger.info("Adjusted timeout: %s", self.timeout.isoformat())
elif silence_timeout > self.timeout:
self.timeout = min(new_timeout, silence_timeout)
if silence_timeout > new_timeout:
logger.debug(f"Silence timeout: {self.timeout:%Y-%m-%d %H:%M:%S}")

def cleanup(self):
"""Clear members."""
Expand Down Expand Up @@ -200,9 +220,11 @@ def _predict_pass_granules(self, granule_metadata):
_get_platform_name(granule_metadata),
self.region.description,
str(sorted(self.planned_granule_times)))
self.timeout = (max(self.planned_granule_times) +
self.granule_duration +
self.timeliness)
self.timeout = min(
datetime.now() + self.silence,
(max(self.planned_granule_times) +
self.granule_duration +
self.timeliness))
logger.info("Planned timeout for %s: %s", self.region.description,
self.timeout.isoformat())
else:
Expand Down
15 changes: 14 additions & 1 deletion pytroll_collectors/tests/test_geographic_gatherer.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ def setUp(self):
'publish_topic': '/topic',
'watcher': 'Observer',
}
self.config["silence_section"] = {**self.config["minimal_config"],
"silence": 300}

self.RegionCollector = self._patch_and_add_cleanup(
'pytroll_collectors.geographic_gatherer.RegionCollector')
Expand Down Expand Up @@ -185,6 +187,17 @@ def test_init_observer(self):
self._watchdog_test(
sections, gatherer, self.publisher, self.PostTrollTrigger, self.WatchDogTrigger, self.RegionCollector)

def test_init_silence(self):
"""Test initialisation of GeographicGatherer with silence.

Test that the GeographicGatherer is correctly initiated when monitoring
for silence is included in the configuration.
"""
from pytroll_collectors.geographic_gatherer import GeographicGatherer
sections = ["silence_section"]
opts = FakeOpts(sections)
GeographicGatherer(self.config, opts)

def _watchdog_test(self, sections, gatherer, publisher, PostTrollTrigger, WatchDogTrigger, RegionCollector):
# There's one trigger
assert len(gatherer.triggers) == 1
Expand Down Expand Up @@ -229,7 +242,7 @@ def test_init_all_sections(self):
assert len(gatherer.triggers) == num_sections

# See that the trigger classes have been accessed the correct times
assert self.PostTrollTrigger.call_count == 2
assert self.PostTrollTrigger.call_count == 3
assert self.WatchDogTrigger.call_count == 2

# N regions for each section
Expand Down
39 changes: 39 additions & 0 deletions pytroll_collectors/tests/test_region_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,19 +133,58 @@ def test_adjust_timeout(europe, caplog):
"uri": "file://alt/0"}
alt_europe_collector = RegionCollector(
europe,
timeliness=datetime.timedelta(seconds=600),
granule_duration=datetime.timedelta(seconds=180))

with caplog.at_level(logging.DEBUG):
alt_europe_collector.collect(
{**granule_metadata,
"start_time": datetime.datetime(2021, 4, 11, 10, 0)})
assert alt_europe_collector.timeout == datetime.datetime(2021, 4, 11, 10, 28)
alt_europe_collector.collect(
{**granule_metadata,
"start_time": datetime.datetime(2021, 4, 11, 10, 15)})
alt_europe_collector.collect(
{**granule_metadata,
"start_time": datetime.datetime(2021, 4, 11, 10, 12)})
assert "Adjusted timeout" in caplog.text
assert alt_europe_collector.timeout == datetime.datetime(2021, 4, 11, 10, 22)


@unittest.mock.patch("pyorbital.tlefile.urlopen", new=_fakeopen)
def test_silence_timeout(europe, caplog):
"""Test that the monitor for silence timeout is working."""
from pytroll_collectors.region_collector import RegionCollector
granule_metadata = {
"sensor": "avhrr",
"tle_platform_name": "Metop-C",
"uri": "file://alt/0"}
alt_europe_collector = RegionCollector(
europe,
timeliness=datetime.timedelta(seconds=60),
granule_duration=datetime.timedelta(seconds=180),
silence=datetime.timedelta(seconds=900))
caplog.set_level(logging.DEBUG)
with unittest.mock.patch("pytroll_collectors.region_collector.datetime") as prd:
prd.now.return_value = datetime.datetime(2021, 4, 11, 10, 1)
alt_europe_collector.collect(
{**granule_metadata,
"start_time": datetime.datetime(2021, 4, 11, 10, 0)})
# earliest timeout is due to monitor for silence
assert alt_europe_collector.timeout == datetime.datetime(2021, 4, 11, 10, 16)
prd.now.return_value = datetime.datetime(2021, 4, 11, 10, 4)
alt_europe_collector.collect(
{**granule_metadata,
"start_time": datetime.datetime(2021, 4, 11, 10, 3)})
assert alt_europe_collector.timeout == datetime.datetime(2021, 4, 11, 10, 19)
assert "Planned timeout for euro_ma: 2021-04-11T10:19" in caplog.text
prd.now.return_value = datetime.datetime(2021, 4, 11, 10, 16)
alt_europe_collector.collect(
{**granule_metadata,
"start_time": datetime.datetime(2021, 4, 11, 10, 15)})
# earliest timeout is due to duration + timeliness
assert alt_europe_collector.timeout == datetime.datetime(2021, 4, 11, 10, 16)
assert "Silence timeout: 2021-04-11 10:16" not in caplog.text


@pytest.mark.skip(reason="test never finishes")
Expand Down