diff --git a/tests/test_crontab_parser.py b/tests/test_crontab_parser.py index 39d3b9a83..c28de3ffc 100644 --- a/tests/test_crontab_parser.py +++ b/tests/test_crontab_parser.py @@ -30,6 +30,45 @@ def test_parser() -> None: assert get_next_datetime('5,10,55 * * * *', t) == datetime.datetime(2017, 6, 15, 10, 55) +def test_isoweekday() -> None: + t = datetime.datetime(2017, 6, 15, 10, 16, 50) + assert get_next_datetime('* * * * 0', t) == datetime.datetime(2017, 6, 18, 0, 0) + assert get_next_datetime('* * * * 1', t) == datetime.datetime(2017, 6, 19, 0, 0) + assert get_next_datetime('* * * * 2', t) == datetime.datetime(2017, 6, 20, 0, 0) + assert get_next_datetime('* * * * 3', t) == datetime.datetime(2017, 6, 21, 0, 0) + assert get_next_datetime('* * * * 4', t) == datetime.datetime(2017, 6, 15, 10, 17) + assert get_next_datetime('* * * * 5', t) == datetime.datetime(2017, 6, 16, 0, 0) + assert get_next_datetime('* * * * 6', t) == datetime.datetime(2017, 6, 17, 0, 0) + assert get_next_datetime('* * * * 7', t) == datetime.datetime(2017, 6, 18, 0, 0) + + assert get_next_datetime('* * * * mon-tue', t) == datetime.datetime(2017, 6, 19, 0, 0) + + t = datetime.datetime(2017, 6, 19, 10, 16, 50) + assert get_next_datetime('* * * * 5-6', t) == datetime.datetime(2017, 6, 23, 0, 0) + assert get_next_datetime('* * * * 0-4', t) == datetime.datetime(2017, 6, 19, 10, 17) + assert get_next_datetime('* * * * 5-7', t) == datetime.datetime(2017, 6, 23, 0, 0) + + assert get_next_datetime('* * * * fri-sat', t) == datetime.datetime(2017, 6, 23, 0, 0) + assert get_next_datetime('* * * * sun-fri', t) == datetime.datetime(2017, 6, 19, 10, 17) + assert get_next_datetime('* * * * fri-sun', t) == datetime.datetime(2017, 6, 23, 0, 0) + + assert get_next_datetime('* * * * 0-7', t) == datetime.datetime(2017, 6, 19, 10, 17) + + with pytest.raises(Exception): + get_next_datetime('* * * * wed-mon', t) + + +def test_days_with_isoweekday() -> None: + t = datetime.datetime(2017, 8, 24, 18, 1, 0) + assert get_next_datetime('0 10 1-7 * *', t) == datetime.datetime(2017, 9, 1, 10, 0) + assert get_next_datetime('0 10 * * mon', t) == datetime.datetime(2017, 8, 28, 10, 0) + assert get_next_datetime('0 10 1-7 * mon', t) == datetime.datetime(2017, 8, 28, 10, 0) + assert get_next_datetime('0 10 1-25 * mon', t) == datetime.datetime(2017, 8, 25, 10, 0) + assert get_next_datetime('0 * 1-25 * mon', t) == datetime.datetime(2017, 8, 24, 19, 0) + assert get_next_datetime('0 * 1-24 * mon', t) == datetime.datetime(2017, 8, 24, 19, 0) + assert get_next_datetime('0 * 1-23 * mon', t) == datetime.datetime(2017, 8, 28, 0, 0) + + def test_timezones() -> None: t = datetime.datetime(2017, 6, 15, 10, 16, 50, tzinfo=pytz.UTC) assert get_next_datetime('* * * * *', t) == datetime.datetime(2017, 6, 15, 10, 17, tzinfo=pytz.UTC) @@ -56,16 +95,28 @@ def test_advanced_parsing() -> None: assert get_next_datetime('* * * * Lwed-fri', t) == datetime.datetime(2017, 6, 28) assert get_next_datetime('* * 4-15 feb-jun wed-fri', t) == datetime.datetime(2017, 6, 15, 10, 17) - assert get_next_datetime('3-20/2 5 4-15 feb-may wed-fri', t) == datetime.datetime(2018, 2, 7, 5, 3) + assert get_next_datetime('3-20/2 5 4-15 feb-may *', t) == datetime.datetime(2018, 2, 4, 5, 3) + assert get_next_datetime('3-20/2 5 4-15 feb-may wed', t) == datetime.datetime(2018, 2, 4, 5, 3) + assert get_next_datetime('3-20/2 5 10-15 feb-may wed', t) == datetime.datetime(2018, 2, 7, 5, 3) + assert get_next_datetime('3-20/2 5 * feb-may wed', t) == datetime.datetime(2018, 2, 7, 5, 3) + assert get_next_datetime('3-20/2 5 4-15 feb-may mon,tue-wed', t) == datetime.datetime(2018, 2, 4, 5, 3) + assert get_next_datetime('3-20/2 5 4-15 feb-may wed-fri', t) == datetime.datetime(2018, 2, 1, 5, 3) + assert get_next_datetime('3-20/2 5 7-15 feb-may wed,tue,sat,mon', t) == datetime.datetime(2018, 2, 3, 5, 3) assert get_next_datetime('* * 29 2 *', t) == datetime.datetime(2020, 2, 29, 0, 0) - assert get_next_datetime('* * 29 2 0', t) == datetime.datetime(2032, 2, 29, 0, 0) + assert get_next_datetime('* * 29 2 0', t) == datetime.datetime(2018, 2, 4, 0, 0) + assert get_next_datetime('* * 29 2 0 2020', t) == datetime.datetime(2020, 2, 2, 0, 0) assert get_next_datetime('30 5 * jan,mar Ltue', t) == datetime.datetime(2018, 1, 30, 5, 30) t = datetime.datetime(2011, 1, 10, 23, 59, 30) assert get_next_datetime('0 0 1 jan/2 * 2011-2013', t) == datetime.datetime(2011, 3, 1) + t = datetime.datetime(2017, 6, 15, 10, 16, 50) + assert get_next_datetime('* * 29 2 mon 2048', t) == datetime.datetime(2048, 2, 3, 0, 0) + assert get_next_datetime('* * 29 2 * 2048', t) == datetime.datetime(2048, 2, 29, 0, 0) + assert get_next_datetime('* * 29 2 * 2049,2051-2060', t) == datetime.datetime(2052, 2, 29, 0, 0) + def test_impossible_dates() -> None: t = datetime.datetime(2017, 6, 15, 10, 16, 50) @@ -74,12 +125,23 @@ def test_impossible_dates() -> None: with pytest.raises(Exception): get_next_datetime('* * 29 2 * 2017-2019', t) - assert get_next_datetime('* * 29 2 mon 2048', t) is None - - assert get_next_datetime('* * 29 2 mon 2048-2070,2073-2200', t) is None + with pytest.raises(Exception): + get_next_datetime('* * 29 2 * 2049-2050,2073-2075,2099-2101', t) with pytest.raises(Exception): get_next_datetime('70 * * *', t) with pytest.raises(Exception): get_next_datetime('* * 30 2 *', t) + + with pytest.raises(Exception): + get_next_datetime('* * * * tue-mon', t) + + with pytest.raises(Exception): + get_next_datetime('* * * x-dec *', t) + + with pytest.raises(Exception): + get_next_datetime('* * * jan-y *', t) + + with pytest.raises(Exception): + get_next_datetime('* * * nope *', t) diff --git a/tomodachi/helpers/crontab.py b/tomodachi/helpers/crontab.py index 982bbd614..e121ebbe1 100644 --- a/tomodachi/helpers/crontab.py +++ b/tomodachi/helpers/crontab.py @@ -8,7 +8,7 @@ ('hour', (0, 23), {}), ('day', (1, 31), {}), ('month', (1, 12), {'jan': 1, 'feb': 2, 'mar': 3, 'apr': 4, 'may': 5, 'jun': 6, 'jul': 7, 'aug': 8, 'sep': 9, 'oct': 10, 'nov': 11, 'dec': 12}), - ('isoweekday', (0, 6), {'mon': 1, 'tue': 2, 'wed': 3, 'thu': 4, 'fri': 5, 'sat': 6, 'sun': 0}), + ('isoweekday', (0, 7), {'mon': 1, 'tue': 2, 'wed': 3, 'thu': 4, 'fri': 5, 'sat': 6, 'sun': 0}), ('year', (1970, 2099), {}) ] # type: List[Tuple[str, Tuple[int, int], Dict[str, int]]] crontab_aliases = { @@ -30,10 +30,18 @@ def get_next_datetime(crontab_notation: str, now_date: datetime.datetime) -> Opt values = [] last_day = False last_weekday = False + use_weekdays = False + use_days = False for i, attr in enumerate(cron_attributes): cron_type, cron_range, aliases = attr # type: str, Tuple, Dict[str, int] available_values = [] # type: Union[List[int], set] parts = cron_parts[i].lower().split(',') + + if attr[0] == 'isoweekday' and cron_parts[i] != '*': + use_weekdays = True + if attr[0] == 'day' and cron_parts[i] != '*': + use_days = True + for part in parts: last = False parsed = False @@ -47,6 +55,12 @@ def get_next_datetime(crontab_notation: str, now_date: datetime.datetime) -> Opt a_value = a_value[1:] a = int(aliases.get(a_value, -1)) b = int(aliases.get(b_value, -1)) + + if attr[0] == 'isoweekday' and b < a: + if b_value == 'sun': + b = 7 + else: + raise Exception('Invalid cron notation: invalid values for {} ({})'.format(attr[0], part)) if a < 0: try: a = int(a_value) @@ -106,6 +120,9 @@ def get_next_datetime(crontab_notation: str, now_date: datetime.datetime) -> Opt if last and attr[0] == 'isoweekday': last_weekday = True + if attr[0] == 'isoweekday': + possible_values = [x if x != 7 else 0 for x in possible_values] + if not possible_values: raise Exception('Invalid cron notation: invalid values for {}'.format(attr[0])) if isinstance(available_values, list): @@ -127,6 +144,8 @@ def calculate_date(input_date: datetime.datetime, last_day: bool, last_weekday: next_date = input_date # type: Optional[datetime.datetime] while True: original_date = next_date + next_date_weekday = next_date + for i, attr in enumerate(cron_attributes): if attr[0] == 'isoweekday': continue @@ -144,15 +163,49 @@ def calculate_date(input_date: datetime.datetime, last_day: bool, last_weekday: next_date = None break - if next_date and (next_date.isoweekday() % 7) not in values[4]: + for i, attr in enumerate(cron_attributes): + if attr[0] == 'isoweekday' or attr[0] == 'day': + continue + value = getattr(next_date_weekday, attr[0]) + possible_values = [v for v in values[i] if v >= value] + if not possible_values: + if attr[0] == 'year': + return None + next_date_weekday = None + break + new_value = min(possible_values) + try: + next_date_weekday = tz.localize(datetime.datetime(*[getattr(next_date_weekday, dv) if dv != attr[0] else new_value for dv in ['year', 'month', 'day', 'hour', 'minute']])) + except ValueError: + next_date_weekday = None + break + + if use_weekdays and next_date_weekday: + for d in range(next_date_weekday.day, monthrange(next_date_weekday.year, next_date_weekday.month)[1] + 1): + next_date_weekday = tz.localize(datetime.datetime(*[getattr(next_date_weekday, dv) if dv != 'day' else d for dv in ['year', 'month', 'day', 'hour', 'minute']])) + if next_date_weekday and (next_date_weekday.isoweekday() % 7) in values[4]: + break + + if use_days and not use_weekdays: + next_date_weekday = None + if not use_days and use_weekdays: next_date = None + + if use_weekdays and next_date_weekday and (next_date_weekday.isoweekday() % 7) not in values[4] and next_date_weekday.isoweekday() not in values[4]: + next_date_weekday = None if next_date and last_day and next_date.day != monthrange(next_date.year, next_date.month)[1]: next_date = None - if next_date and last_weekday: - for i in range(next_date.day + 1, monthrange(next_date.year, next_date.month)[1] + 1): - if datetime.datetime(next_date.year, next_date.month, i).isoweekday() == next_date.isoweekday(): - next_date = None + if use_weekdays and next_date_weekday and last_weekday: + for i in range(next_date_weekday.day + 1, monthrange(next_date_weekday.year, next_date_weekday.month)[1] + 1): + if datetime.datetime(next_date_weekday.year, next_date_weekday.month, i).isoweekday() == next_date_weekday.isoweekday(): + next_date_weekday = None break + + if use_weekdays and not next_date and next_date_weekday: + next_date = next_date_weekday + elif use_weekdays and next_date and next_date_weekday and next_date_weekday < next_date: + next_date = next_date_weekday + if not next_date: next_date = original_date if next_date: