Skip to content

Commit

Permalink
replace cronutils
Browse files Browse the repository at this point in the history
  • Loading branch information
Maxim Mityutko committed Jan 10, 2025
1 parent c652522 commit 2f5a4d2
Show file tree
Hide file tree
Showing 6 changed files with 142 additions and 116 deletions.
1 change: 0 additions & 1 deletion brickflow/cli/entrypoint.template
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ def main() -> None:
git_repo="{{ git_https_url }}",
provider="{{ git_provider }}",
libraries=[
MavenTaskLibrary(coordinates="com.cronutils:cron-utils:9.2.0"),
# PypiTaskLibrary(package="spark-expectations=={{spark_expectations_version}}"), # Uncomment if spark-expectations is needed
],
) as f:
Expand Down
5 changes: 0 additions & 5 deletions brickflow/engine/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -1137,10 +1137,6 @@ def filter_bf_related_libraries(
if isinstance(lib, PypiTaskLibrary):
if lib.package.startswith("apache-airflow") is True:
continue
if isinstance(lib, MavenTaskLibrary):
# TODO: clean this up but no one should really be using cron-utils at the moment for outside of brickflow
if lib.coordinates.startswith("com.cronutils:cron-utils:9.2.0") is True:
continue
resp.append(lib)
return resp

Expand Down Expand Up @@ -1212,7 +1208,6 @@ def get_brickflow_libraries(enable_plugins: bool = False) -> List[TaskLibrary]:
PypiTaskLibrary("tableauserverclient==0.25"),
PypiTaskLibrary("boxsdk==3.9.2"),
PypiTaskLibrary("cerberus-python-client==2.5.4"),
MavenTaskLibrary("com.cronutils:cron-utils:9.2.0"),
]
else:
return [bf_lib]
197 changes: 111 additions & 86 deletions brickflow_plugins/airflow/cronhelper.py
Original file line number Diff line number Diff line change
@@ -1,106 +1,131 @@
import re
import functools
import os
from pathlib import Path

from brickflow_plugins import log

try:
from py4j.protocol import Py4JError
except ImportError:
raise ImportError(
"You must install py4j to use cronhelper, "
"please try pip install py4j. "
"This library is not installed as "
"it is provided by databricks OOTB."
)


class CronHelper:
_jvm = None

def __init__(self):
self._j_cron_mapper = None
self._unix_parser = None
self._quartz_parser = None

@classmethod
def get_jvm(cls):
if cls._jvm is None: # Initialize the JVM only once and cache it
try:
log.info(
"Attempting to load JVM from pip installation of py4j for cronhelper"
)
from py4j.java_gateway import JavaGateway

cron_utils = (
Path(os.path.abspath(__file__)).parent.absolute()
/ "cron-utils-9.2.0.jar"
)
jg = JavaGateway.launch_gateway(classpath=str(cron_utils))
log.info(
"Launched py4j gateway with cronutils jar added to class path from py4j pip installation"
)
cls._jvm = jg.jvm
except Py4JError as e:
if str(e).startswith("Could not find py4j jar"):
log.info(
"Could not find py4j jar, attempting to load JVM from SparkSession"
)
from pyspark.sql import SparkSession

cls._jvm = SparkSession.getActiveSession()._jvm
else:
raise e
return cls._jvm

def _initialize_jvm(self):
jvm = self.get_jvm()

j_cron_parser = jvm.com.cronutils.parser.CronParser
j_cron_definition_builder = (
jvm.com.cronutils.model.definition.CronDefinitionBuilder
)
j_cron_type = jvm.com.cronutils.model.CronType

self._j_cron_mapper = jvm.com.cronutils.mapper.CronMapper
self._unix_parser = j_cron_parser(
j_cron_definition_builder.instanceDefinitionFor(j_cron_type.UNIX)
EVERY_X_UNITS_REPLACE_PLACEHOLDER = "%s"
QUARTZ_EVERY_X_UNITS_REGEX = re.compile(r"^0/(\d+)$") # For handling 0/5 units
UNIX_EVERY_X_UNITS_REGEX = re.compile(r"^\*/(\d+)$") # For handling */5 units
QUARTZ_EVERY_X_UNITS_REPLACE_PATTERN = f"0/{EVERY_X_UNITS_REPLACE_PLACEHOLDER}"
UNIX_EVERY_X_UNITS_REPLACE_PATTERN = f"*/{EVERY_X_UNITS_REPLACE_PLACEHOLDER}"

@staticmethod
def __get_expression_parts(expression: str) -> list:
parts = [part.strip() for part in expression.split(" ")]

# Unix cron expression have 5 parts, Quartz cron expression have 6 or 7 parts
if len(parts) in [5, 7]:
return parts
# Year is an optional part in Quartz cron expression, adding the extra element to mimic 7 part Quartz expression
if len(parts) == 6:
parts.append("*")
return parts

raise ValueError("Invalid cron expression!")

@staticmethod
def convert_interval_parts(part: str, is_quartz: bool = False) -> str:
every_x_units_pattern = (
CronHelper.QUARTZ_EVERY_X_UNITS_REGEX
if is_quartz
else CronHelper.UNIX_EVERY_X_UNITS_REGEX
)
self._quartz_parser = j_cron_parser(
j_cron_definition_builder.instanceDefinitionFor(j_cron_type.QUARTZ)
matches = every_x_units_pattern.match(part)
every_x_units_replace_pattern = (
CronHelper.QUARTZ_EVERY_X_UNITS_REPLACE_PATTERN
if is_quartz
else CronHelper.UNIX_EVERY_X_UNITS_REPLACE_PATTERN
)

def _get_unix_parser(self):
if self._unix_parser is None:
self._initialize_jvm()
return self._unix_parser
if matches:
return every_x_units_replace_pattern.replace(
CronHelper.EVERY_X_UNITS_REPLACE_PLACEHOLDER, matches.group(1)
)

def _get_quartz_parser(self):
if self._quartz_parser is None:
self._initialize_jvm()
return self._quartz_parser
return part

@functools.lru_cache(maxsize=128) # cron expression conversion will not change
def unix_to_quartz(self, unix_cron: str) -> str:
unix_parser = self._get_unix_parser()
quartz_expr = (
self._j_cron_mapper.fromUnixToQuartz()
.map(unix_parser.parse(unix_cron))
.asString()
)
log.info("Converted unix cron %s to quartz cron %s", unix_cron, quartz_expr)
return quartz_expr
parts = self.__get_expression_parts(expression=unix_cron)

if len(parts) != 5:
raise ValueError("Invalid Unix cron expression")

minute, hour, dom, month, dow = map(self.convert_interval_parts, parts)

# Converting Unix DOW to Quartz DOW
def shift_days(day: str) -> str:
"""
Quartz DOW starts from 1 (Sunday) while Unix DOW starts from 0 (Sunday)
"""
if "-" in day:
return "-".join([shift_days(day=d) for d in day.split("-")])

# Unix cron Sunday can be represented as 0 or 7, but only as 1 in Quartz cron
if day in ["0", "7"]:
return "1"
if day in ["SUN", "MON", "TUE", "WED", "THU", "FRI", "SAT"]:
return day
return str(int(day) + 1)

if "," in dow:
quartz_dow = ",".join([shift_days(day=day) for day in dow.split(",")])
elif dow == "*":
quartz_dow = dow
else:
quartz_dow = shift_days(day=dow)

quartz_dom = dom

if dom != "*" and dow == "*":
quartz_dow = "?"
elif dom == "*":
quartz_dom = "?"

quartz_cron = f"0 {minute} {hour} {quartz_dom} {month} {quartz_dow} *"
log.info("Converted unix cron %s to quartz cron %s", unix_cron, quartz_cron)
return quartz_cron

@functools.lru_cache(maxsize=128) # cron expression conversion will not change
def quartz_to_unix(self, quartz_cron: str) -> str:
quartz_parser = self._get_quartz_parser()
unix_expr = (
self._j_cron_mapper.fromQuartzToUnix()
.map(quartz_parser.parse(quartz_cron))
.asString()
parts = self.__get_expression_parts(expression=quartz_cron)

if len(parts) != 7:
raise ValueError("Invalid Quartz cron expression")

# Unix cron expression does not support '?'
parts = [part.replace("?", "*") for part in parts]

_, minute, hour, dom, month, dow, _ = map(
lambda part: self.convert_interval_parts(part, True), parts
)
log.info("Converted quartz cron %s to unix cron %s", quartz_cron, unix_expr)
return unix_expr

# Converting Quartz DOW to Unix DOW
def shift_days(day: str) -> str:
"""
Quartz DOW starts from 1 (Sunday) while Unix DOW starts from 0 (Sunday)
"""
if "-" in day:
return "-".join([shift_days(day=d) for d in day.split("-")])
if day in ["SUN", "MON", "TUE", "WED", "THU", "FRI", "SAT"]:
return day

return str(int(day) - 1)

if "," in dow:
unix_dow = ",".join([shift_days(day=day) for day in dow.split(",")])
elif dow == "*":
unix_dow = "*"
else:
unix_dow = shift_days(day=dow)

unix_dom = dom

unix_cron = f"{minute} {hour} {unix_dom} {month} {unix_dow}"
log.info("Converted quartz cron %s to unix cron %s", quartz_cron, unix_cron)
return unix_cron


cron_helper = CronHelper()
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,6 @@ targets:
- pypi:
package: cerberus-python-client==2.5.4
repo: null
- maven:
coordinates: com.cronutils:cron-utils:9.2.0
exclusions: null
repo: null
max_retries: null
min_retry_interval_millis: null
notebook_task:
Expand Down
6 changes: 3 additions & 3 deletions tests/engine/test_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ def test_get_brickflow_lib_version(self):
def test_get_brickflow_libraries(self):
settings = BrickflowProjectDeploymentSettings()
settings.brickflow_project_runtime_version = "1.0.0"
assert len(get_brickflow_libraries(enable_plugins=True)) == 7
assert len(get_brickflow_libraries(enable_plugins=True)) == 6
assert len(get_brickflow_libraries(enable_plugins=False)) == 1
lib = get_brickflow_libraries(enable_plugins=False)[0].dict
expected = {
Expand All @@ -465,7 +465,7 @@ def test_get_brickflow_libraries_semver_non_numeric(self):
settings = BrickflowProjectDeploymentSettings()
tag = "1.0.1rc1234"
settings.brickflow_project_runtime_version = tag
assert len(get_brickflow_libraries(enable_plugins=True)) == 7
assert len(get_brickflow_libraries(enable_plugins=True)) == 6
assert len(get_brickflow_libraries(enable_plugins=False)) == 1
lib = get_brickflow_libraries(enable_plugins=False)[0].dict
expected = {
Expand All @@ -481,7 +481,7 @@ def test_get_brickflow_libraries_non_semver(self):
settings = BrickflowProjectDeploymentSettings()
tag = "somebranch"
settings.brickflow_project_runtime_version = tag
assert len(get_brickflow_libraries(enable_plugins=True)) == 7
assert len(get_brickflow_libraries(enable_plugins=True)) == 6
assert len(get_brickflow_libraries(enable_plugins=False)) == 1
lib = get_brickflow_libraries(enable_plugins=False)[0].dict
expected = {
Expand Down
45 changes: 28 additions & 17 deletions tests/test_plugins.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import copy
from typing import List
from unittest import mock
from unittest.mock import patch

import pluggy
import pytest
Expand Down Expand Up @@ -45,21 +44,33 @@ def test_plugins_ensure_installation_import_error(self):
get_brickflow_tasks_hook(pm)
assert_plugin_manager(pm, ["default"])

def test_cron_import_nopy4j(self):
remove_cron_helper = {
"brickflow_plugins.airflow.cronhelper": None,
"py4j": None,
"py4j.protocol": None,
"py4j.java_gateway": None,
}
with patch.dict("sys.modules", remove_cron_helper):
with pytest.raises(ImportError):
import brickflow_plugins.airflow.cronhelper as cronhelper # noqa

def test_cron_conversion(self):
@pytest.mark.parametrize(
"quartz_cron, expected_unix_cron",
[
("0 * * ? * * *", "* * * * *"),
("0 */5 * ? * * *", "*/5 * * * *"),
("0 30 * ? * * *", "30 * * * *"),
("0 0 12 ? * * *", "0 12 * * *"),
("0 0 12 ? * 2 *", "0 12 * * 1"),
("0 0 0 10 * ? *", "0 0 10 * *"),
("0 0 0 1 1 ? *", "0 0 1 1 *"),
("0 0/5 14,18 * * ?", "0/5 14,18 * * *"),
("0 0 12 ? * 1,2,5-7 *", "0 12 * * 0,1,4-6"),
("0 0 12 ? * SUN,MON,THU-SAT *", "0 12 * * SUN,MON,THU-SAT"),
],
)
def test_cron_conversion(self, quartz_cron, expected_unix_cron):
import brickflow_plugins.airflow.cronhelper as cronhelper # noqa

unix_cron = cronhelper.cron_helper.quartz_to_unix("0 0 12 * * ?")
quartz_cron = cronhelper.cron_helper.unix_to_quartz(unix_cron)
unix_cron_second = cronhelper.cron_helper.quartz_to_unix(quartz_cron)
assert unix_cron == unix_cron_second, "cron conversion should be idempotent"
converted_unix_cron = cronhelper.cron_helper.quartz_to_unix(quartz_cron)
converted_quartz_cron = cronhelper.cron_helper.unix_to_quartz(
converted_unix_cron
)
converted_unix_cron_second = cronhelper.cron_helper.quartz_to_unix(
converted_quartz_cron
)

assert (
converted_unix_cron == converted_unix_cron_second
), "cron conversion should be idempotent"
assert converted_unix_cron == expected_unix_cron

0 comments on commit 2f5a4d2

Please sign in to comment.