Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelSquires committed Nov 13, 2024
1 parent 294696b commit e206dc7
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 4 deletions.
5 changes: 3 additions & 2 deletions synapse/lib/agenda.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@
import logging
import calendar
import datetime
import functools
import itertools
import collections
from datetime import timezone as tz
from collections.abc import Iterable, Mapping

Expand Down Expand Up @@ -886,15 +884,18 @@ async def _runJob(self, user, appt):
took = finishtime - starttime
mesg = f'Agenda completed query for iden={appt.iden} name={appt.name} with result "{result}" ' \
f'took {took:.3f}s'

if not self.core.isactive:
mesg = mesg + ' Agenda status will not be saved since the Cortex is no longer the leader.'

logger.info(mesg, extra={'synapse': {'iden': appt.iden, 'name': appt.name, 'user': user.iden,
'result': result, 'username': user.name, 'took': took}})
edits = {
'lastfinishtime': finishtime,
'isrunning': False,
'lastresult': result,
}

if self.core.isactive:
await self.core.addCronEdits(appt.iden, edits)

Expand Down
76 changes: 74 additions & 2 deletions synapse/tests/test_lib_agenda.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@
import synapse.common as s_common
import synapse.tests.utils as s_t_utils

import synapse.lib.hive as s_hive
import synapse.lib.lmdbslab as s_lmdbslab
import synapse.tools.backup as s_tools_backup

import synapse.lib.agenda as s_agenda
Expand Down Expand Up @@ -829,6 +827,80 @@ async def test_agenda_mirror_realtime(self):
data = stream.read()
self.isin("_Appt.edits() Invalid attribute received: invalid = 'newp'", data)

async def test_agenda_promotions(self):
async with self.getTestAha() as aha:

conf00 = {
'aha:provision': await aha.addAhaSvcProv('00.cortex')
}

async with self.getTestCore(conf=conf00) as core00:
self.false(core00.conf.get('mirror'))

await core00.callStorm('cron.add --minute +1 { $lib.time.sleep(90) }')

prov01 = {'mirror': '00.cortex'}
conf01 = {
'aha:provision': await aha.addAhaSvcProv('01.cortex', provinfo=prov01),
}

async with self.getTestCore(conf=conf01) as core01:
core00.agenda._addTickOff(55)

mesgs = []
async for mesg in core00.behold():
mesgs.append(mesg)
break

self.eq(mesgs[0]['event'], 'cron:start')

# Inspect crons and tasks
crons00 = await core00.callStorm('return($lib.cron.list())')
self.len(1, crons00)
self.true(crons00[0].get('isrunning'))

crons01 = await core01.callStorm('return($lib.cron.list())')
self.len(1, crons01)
self.false(crons01[0].get('isrunning'))

tasks00 = await core00.callStorm('return($lib.ps.list())')
# Two tasks: one for the cronjob and one for the cronjob instance
self.len(2, tasks00)

tasks01 = await core01.callStorm('return($lib.ps.list())')
self.len(0, tasks01)

# Promote and inspect cortex status
await core01.promote(graceful=True)
self.false(core00.isactive)
self.true(core01.isactive)

# Bump the ticks on core01 so the cron job starts
core01.agenda._addTickOff(55)

mesgs = []
async for mesg in core01.behold():
mesgs.append(mesg)
break

self.eq(mesgs[0]['event'], 'cron:start')

crons00 = await core00.callStorm('return($lib.cron.list())')
self.len(1, crons00)
self.false(crons00[0].get('isrunning'))

crons01 = await core01.callStorm('return($lib.cron.list())')
self.len(1, crons01)
self.true(crons01[0].get('isrunning'))

tasks00 = await core00.callStorm('return($lib.ps.list())')
# This task is the leftover cronjob from before promotion
self.len(1, tasks00)

tasks01 = await core01.callStorm('return($lib.ps.list())')
# This should be two? One for the cronjob and one for the instance.
self.len(1, tasks01)

async def test_cron_kill(self):
async with self.getTestCore() as core:

Expand Down

0 comments on commit e206dc7

Please sign in to comment.