diff --git a/synapse/lib/agenda.py b/synapse/lib/agenda.py index deb63479b1..1f674b66ff 100644 --- a/synapse/lib/agenda.py +++ b/synapse/lib/agenda.py @@ -5,9 +5,7 @@ import logging import calendar import datetime -import functools import itertools -import collections from datetime import timezone as tz from collections.abc import Iterable, Mapping @@ -886,8 +884,10 @@ async def _runJob(self, user, appt): took = finishtime - starttime mesg = f'Agenda completed query for iden={appt.iden} name={appt.name} with result "{result}" ' \ f'took {took:.3f}s' + if not self.core.isactive: mesg = mesg + ' Agenda status will not be saved since the Cortex is no longer the leader.' + logger.info(mesg, extra={'synapse': {'iden': appt.iden, 'name': appt.name, 'user': user.iden, 'result': result, 'username': user.name, 'took': took}}) edits = { @@ -895,6 +895,7 @@ async def _runJob(self, user, appt): 'isrunning': False, 'lastresult': result, } + if self.core.isactive: await self.core.addCronEdits(appt.iden, edits) diff --git a/synapse/tests/test_lib_agenda.py b/synapse/tests/test_lib_agenda.py index 5fd810a56c..a353059495 100644 --- a/synapse/tests/test_lib_agenda.py +++ b/synapse/tests/test_lib_agenda.py @@ -9,8 +9,6 @@ import synapse.common as s_common import synapse.tests.utils as s_t_utils -import synapse.lib.hive as s_hive -import synapse.lib.lmdbslab as s_lmdbslab import synapse.tools.backup as s_tools_backup import synapse.lib.agenda as s_agenda @@ -829,6 +827,80 @@ async def test_agenda_mirror_realtime(self): data = stream.read() self.isin("_Appt.edits() Invalid attribute received: invalid = 'newp'", data) + async def test_agenda_promotions(self): + async with self.getTestAha() as aha: + + conf00 = { + 'aha:provision': await aha.addAhaSvcProv('00.cortex') + } + + async with self.getTestCore(conf=conf00) as core00: + self.false(core00.conf.get('mirror')) + + await core00.callStorm('cron.add --minute +1 { $lib.time.sleep(90) }') + + prov01 = {'mirror': '00.cortex'} + conf01 = { + 'aha:provision': await aha.addAhaSvcProv('01.cortex', provinfo=prov01), + } + + async with self.getTestCore(conf=conf01) as core01: + core00.agenda._addTickOff(55) + + mesgs = [] + async for mesg in core00.behold(): + mesgs.append(mesg) + break + + self.eq(mesgs[0]['event'], 'cron:start') + + # Inspect crons and tasks + crons00 = await core00.callStorm('return($lib.cron.list())') + self.len(1, crons00) + self.true(crons00[0].get('isrunning')) + + crons01 = await core01.callStorm('return($lib.cron.list())') + self.len(1, crons01) + self.false(crons01[0].get('isrunning')) + + tasks00 = await core00.callStorm('return($lib.ps.list())') + # Two tasks: one for the cronjob and one for the cronjob instance + self.len(2, tasks00) + + tasks01 = await core01.callStorm('return($lib.ps.list())') + self.len(0, tasks01) + + # Promote and inspect cortex status + await core01.promote(graceful=True) + self.false(core00.isactive) + self.true(core01.isactive) + + # Bump the ticks on core01 so the cron job starts + core01.agenda._addTickOff(55) + + mesgs = [] + async for mesg in core01.behold(): + mesgs.append(mesg) + break + + self.eq(mesgs[0]['event'], 'cron:start') + + crons00 = await core00.callStorm('return($lib.cron.list())') + self.len(1, crons00) + self.false(crons00[0].get('isrunning')) + + crons01 = await core01.callStorm('return($lib.cron.list())') + self.len(1, crons01) + self.true(crons01[0].get('isrunning')) + + tasks00 = await core00.callStorm('return($lib.ps.list())') + # This task is the leftover cronjob from before promotion + self.len(1, tasks00) + + tasks01 = await core01.callStorm('return($lib.ps.list())') + # This should be two? One for the cronjob and one for the instance. + self.len(1, tasks01) + async def test_cron_kill(self): async with self.getTestCore() as core: