Skip to content

Commit

Permalink
Merge pull request #156 from kalaspuff/fix/crontab-isoweekday-days-co…
Browse files Browse the repository at this point in the history
…nflict

Fixes invalid crontab implementation which didn't separate isoweekday and day part of the crontab notation
  • Loading branch information
kalaspuff authored Aug 24, 2017
2 parents 11ff6d2 + 72077aa commit 092294c
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 11 deletions.
72 changes: 67 additions & 5 deletions tests/test_crontab_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,45 @@ def test_parser() -> None:
assert get_next_datetime('5,10,55 * * * *', t) == datetime.datetime(2017, 6, 15, 10, 55)


def test_isoweekday() -> None:
t = datetime.datetime(2017, 6, 15, 10, 16, 50)
assert get_next_datetime('* * * * 0', t) == datetime.datetime(2017, 6, 18, 0, 0)
assert get_next_datetime('* * * * 1', t) == datetime.datetime(2017, 6, 19, 0, 0)
assert get_next_datetime('* * * * 2', t) == datetime.datetime(2017, 6, 20, 0, 0)
assert get_next_datetime('* * * * 3', t) == datetime.datetime(2017, 6, 21, 0, 0)
assert get_next_datetime('* * * * 4', t) == datetime.datetime(2017, 6, 15, 10, 17)
assert get_next_datetime('* * * * 5', t) == datetime.datetime(2017, 6, 16, 0, 0)
assert get_next_datetime('* * * * 6', t) == datetime.datetime(2017, 6, 17, 0, 0)
assert get_next_datetime('* * * * 7', t) == datetime.datetime(2017, 6, 18, 0, 0)

assert get_next_datetime('* * * * mon-tue', t) == datetime.datetime(2017, 6, 19, 0, 0)

t = datetime.datetime(2017, 6, 19, 10, 16, 50)
assert get_next_datetime('* * * * 5-6', t) == datetime.datetime(2017, 6, 23, 0, 0)
assert get_next_datetime('* * * * 0-4', t) == datetime.datetime(2017, 6, 19, 10, 17)
assert get_next_datetime('* * * * 5-7', t) == datetime.datetime(2017, 6, 23, 0, 0)

assert get_next_datetime('* * * * fri-sat', t) == datetime.datetime(2017, 6, 23, 0, 0)
assert get_next_datetime('* * * * sun-fri', t) == datetime.datetime(2017, 6, 19, 10, 17)
assert get_next_datetime('* * * * fri-sun', t) == datetime.datetime(2017, 6, 23, 0, 0)

assert get_next_datetime('* * * * 0-7', t) == datetime.datetime(2017, 6, 19, 10, 17)

with pytest.raises(Exception):
get_next_datetime('* * * * wed-mon', t)


def test_days_with_isoweekday() -> None:
t = datetime.datetime(2017, 8, 24, 18, 1, 0)
assert get_next_datetime('0 10 1-7 * *', t) == datetime.datetime(2017, 9, 1, 10, 0)
assert get_next_datetime('0 10 * * mon', t) == datetime.datetime(2017, 8, 28, 10, 0)
assert get_next_datetime('0 10 1-7 * mon', t) == datetime.datetime(2017, 8, 28, 10, 0)
assert get_next_datetime('0 10 1-25 * mon', t) == datetime.datetime(2017, 8, 25, 10, 0)
assert get_next_datetime('0 * 1-25 * mon', t) == datetime.datetime(2017, 8, 24, 19, 0)
assert get_next_datetime('0 * 1-24 * mon', t) == datetime.datetime(2017, 8, 24, 19, 0)
assert get_next_datetime('0 * 1-23 * mon', t) == datetime.datetime(2017, 8, 28, 0, 0)


def test_timezones() -> None:
t = datetime.datetime(2017, 6, 15, 10, 16, 50, tzinfo=pytz.UTC)
assert get_next_datetime('* * * * *', t) == datetime.datetime(2017, 6, 15, 10, 17, tzinfo=pytz.UTC)
Expand All @@ -56,16 +95,28 @@ def test_advanced_parsing() -> None:

assert get_next_datetime('* * * * Lwed-fri', t) == datetime.datetime(2017, 6, 28)
assert get_next_datetime('* * 4-15 feb-jun wed-fri', t) == datetime.datetime(2017, 6, 15, 10, 17)
assert get_next_datetime('3-20/2 5 4-15 feb-may wed-fri', t) == datetime.datetime(2018, 2, 7, 5, 3)
assert get_next_datetime('3-20/2 5 4-15 feb-may *', t) == datetime.datetime(2018, 2, 4, 5, 3)
assert get_next_datetime('3-20/2 5 4-15 feb-may wed', t) == datetime.datetime(2018, 2, 4, 5, 3)
assert get_next_datetime('3-20/2 5 10-15 feb-may wed', t) == datetime.datetime(2018, 2, 7, 5, 3)
assert get_next_datetime('3-20/2 5 * feb-may wed', t) == datetime.datetime(2018, 2, 7, 5, 3)
assert get_next_datetime('3-20/2 5 4-15 feb-may mon,tue-wed', t) == datetime.datetime(2018, 2, 4, 5, 3)
assert get_next_datetime('3-20/2 5 4-15 feb-may wed-fri', t) == datetime.datetime(2018, 2, 1, 5, 3)
assert get_next_datetime('3-20/2 5 7-15 feb-may wed,tue,sat,mon', t) == datetime.datetime(2018, 2, 3, 5, 3)

assert get_next_datetime('* * 29 2 *', t) == datetime.datetime(2020, 2, 29, 0, 0)
assert get_next_datetime('* * 29 2 0', t) == datetime.datetime(2032, 2, 29, 0, 0)
assert get_next_datetime('* * 29 2 0', t) == datetime.datetime(2018, 2, 4, 0, 0)
assert get_next_datetime('* * 29 2 0 2020', t) == datetime.datetime(2020, 2, 2, 0, 0)

assert get_next_datetime('30 5 * jan,mar Ltue', t) == datetime.datetime(2018, 1, 30, 5, 30)

t = datetime.datetime(2011, 1, 10, 23, 59, 30)
assert get_next_datetime('0 0 1 jan/2 * 2011-2013', t) == datetime.datetime(2011, 3, 1)

t = datetime.datetime(2017, 6, 15, 10, 16, 50)
assert get_next_datetime('* * 29 2 mon 2048', t) == datetime.datetime(2048, 2, 3, 0, 0)
assert get_next_datetime('* * 29 2 * 2048', t) == datetime.datetime(2048, 2, 29, 0, 0)
assert get_next_datetime('* * 29 2 * 2049,2051-2060', t) == datetime.datetime(2052, 2, 29, 0, 0)


def test_impossible_dates() -> None:
t = datetime.datetime(2017, 6, 15, 10, 16, 50)
Expand All @@ -74,12 +125,23 @@ def test_impossible_dates() -> None:
with pytest.raises(Exception):
get_next_datetime('* * 29 2 * 2017-2019', t)

assert get_next_datetime('* * 29 2 mon 2048', t) is None

assert get_next_datetime('* * 29 2 mon 2048-2070,2073-2200', t) is None
with pytest.raises(Exception):
get_next_datetime('* * 29 2 * 2049-2050,2073-2075,2099-2101', t)

with pytest.raises(Exception):
get_next_datetime('70 * * *', t)

with pytest.raises(Exception):
get_next_datetime('* * 30 2 *', t)

with pytest.raises(Exception):
get_next_datetime('* * * * tue-mon', t)

with pytest.raises(Exception):
get_next_datetime('* * * x-dec *', t)

with pytest.raises(Exception):
get_next_datetime('* * * jan-y *', t)

with pytest.raises(Exception):
get_next_datetime('* * * nope *', t)
65 changes: 59 additions & 6 deletions tomodachi/helpers/crontab.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
('hour', (0, 23), {}),
('day', (1, 31), {}),
('month', (1, 12), {'jan': 1, 'feb': 2, 'mar': 3, 'apr': 4, 'may': 5, 'jun': 6, 'jul': 7, 'aug': 8, 'sep': 9, 'oct': 10, 'nov': 11, 'dec': 12}),
('isoweekday', (0, 6), {'mon': 1, 'tue': 2, 'wed': 3, 'thu': 4, 'fri': 5, 'sat': 6, 'sun': 0}),
('isoweekday', (0, 7), {'mon': 1, 'tue': 2, 'wed': 3, 'thu': 4, 'fri': 5, 'sat': 6, 'sun': 0}),
('year', (1970, 2099), {})
] # type: List[Tuple[str, Tuple[int, int], Dict[str, int]]]
crontab_aliases = {
Expand All @@ -30,10 +30,18 @@ def get_next_datetime(crontab_notation: str, now_date: datetime.datetime) -> Opt
values = []
last_day = False
last_weekday = False
use_weekdays = False
use_days = False
for i, attr in enumerate(cron_attributes):
cron_type, cron_range, aliases = attr # type: str, Tuple, Dict[str, int]
available_values = [] # type: Union[List[int], set]
parts = cron_parts[i].lower().split(',')

if attr[0] == 'isoweekday' and cron_parts[i] != '*':
use_weekdays = True
if attr[0] == 'day' and cron_parts[i] != '*':
use_days = True

for part in parts:
last = False
parsed = False
Expand All @@ -47,6 +55,12 @@ def get_next_datetime(crontab_notation: str, now_date: datetime.datetime) -> Opt
a_value = a_value[1:]
a = int(aliases.get(a_value, -1))
b = int(aliases.get(b_value, -1))

if attr[0] == 'isoweekday' and b < a:
if b_value == 'sun':
b = 7
else:
raise Exception('Invalid cron notation: invalid values for {} ({})'.format(attr[0], part))
if a < 0:
try:
a = int(a_value)
Expand Down Expand Up @@ -106,6 +120,9 @@ def get_next_datetime(crontab_notation: str, now_date: datetime.datetime) -> Opt
if last and attr[0] == 'isoweekday':
last_weekday = True

if attr[0] == 'isoweekday':
possible_values = [x if x != 7 else 0 for x in possible_values]

if not possible_values:
raise Exception('Invalid cron notation: invalid values for {}'.format(attr[0]))
if isinstance(available_values, list):
Expand All @@ -127,6 +144,8 @@ def calculate_date(input_date: datetime.datetime, last_day: bool, last_weekday:
next_date = input_date # type: Optional[datetime.datetime]
while True:
original_date = next_date
next_date_weekday = next_date

for i, attr in enumerate(cron_attributes):
if attr[0] == 'isoweekday':
continue
Expand All @@ -144,15 +163,49 @@ def calculate_date(input_date: datetime.datetime, last_day: bool, last_weekday:
next_date = None
break

if next_date and (next_date.isoweekday() % 7) not in values[4]:
for i, attr in enumerate(cron_attributes):
if attr[0] == 'isoweekday' or attr[0] == 'day':
continue
value = getattr(next_date_weekday, attr[0])
possible_values = [v for v in values[i] if v >= value]
if not possible_values:
if attr[0] == 'year':
return None
next_date_weekday = None
break
new_value = min(possible_values)
try:
next_date_weekday = tz.localize(datetime.datetime(*[getattr(next_date_weekday, dv) if dv != attr[0] else new_value for dv in ['year', 'month', 'day', 'hour', 'minute']]))
except ValueError:
next_date_weekday = None
break

if use_weekdays and next_date_weekday:
for d in range(next_date_weekday.day, monthrange(next_date_weekday.year, next_date_weekday.month)[1] + 1):
next_date_weekday = tz.localize(datetime.datetime(*[getattr(next_date_weekday, dv) if dv != 'day' else d for dv in ['year', 'month', 'day', 'hour', 'minute']]))
if next_date_weekday and (next_date_weekday.isoweekday() % 7) in values[4]:
break

if use_days and not use_weekdays:
next_date_weekday = None
if not use_days and use_weekdays:
next_date = None

if use_weekdays and next_date_weekday and (next_date_weekday.isoweekday() % 7) not in values[4] and next_date_weekday.isoweekday() not in values[4]:
next_date_weekday = None
if next_date and last_day and next_date.day != monthrange(next_date.year, next_date.month)[1]:
next_date = None
if next_date and last_weekday:
for i in range(next_date.day + 1, monthrange(next_date.year, next_date.month)[1] + 1):
if datetime.datetime(next_date.year, next_date.month, i).isoweekday() == next_date.isoweekday():
next_date = None
if use_weekdays and next_date_weekday and last_weekday:
for i in range(next_date_weekday.day + 1, monthrange(next_date_weekday.year, next_date_weekday.month)[1] + 1):
if datetime.datetime(next_date_weekday.year, next_date_weekday.month, i).isoweekday() == next_date_weekday.isoweekday():
next_date_weekday = None
break

if use_weekdays and not next_date and next_date_weekday:
next_date = next_date_weekday
elif use_weekdays and next_date and next_date_weekday and next_date_weekday < next_date:
next_date = next_date_weekday

if not next_date:
next_date = original_date
if next_date:
Expand Down

0 comments on commit 092294c

Please sign in to comment.