diff --git a/airflow/models/abstractoperator.py b/airflow/models/abstractoperator.py index feafb0b6b637..58c2aec6fdeb 100644 --- a/airflow/models/abstractoperator.py +++ b/airflow/models/abstractoperator.py @@ -38,7 +38,7 @@ from airflow.utils.state import State, TaskInstanceState from airflow.utils.task_group import MappedTaskGroup from airflow.utils.trigger_rule import TriggerRule -from airflow.utils.weight_rule import WeightRule +from airflow.utils.weight_rule import WeightRule, db_safe_priority if TYPE_CHECKING: from collections.abc import Mapping @@ -335,7 +335,7 @@ def priority_weight_total(self) -> int: ) if isinstance(self.weight_rule, _AbsolutePriorityWeightStrategy): - return self.priority_weight + return db_safe_priority(self.priority_weight) elif isinstance(self.weight_rule, _DownstreamPriorityWeightStrategy): upstream = False elif isinstance(self.weight_rule, _UpstreamPriorityWeightStrategy): @@ -344,10 +344,13 @@ def priority_weight_total(self) -> int: upstream = False dag = self.get_dag() if dag is None: - return self.priority_weight - return self.priority_weight + sum( - dag.task_dict[task_id].priority_weight - for task_id in self.get_flat_relative_ids(upstream=upstream) + return db_safe_priority(self.priority_weight) + return db_safe_priority( + self.priority_weight + + sum( + dag.task_dict[task_id].priority_weight + for task_id in self.get_flat_relative_ids(upstream=upstream) + ) ) @cached_property diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py index c1448ef9cc55..fa26a2026f5d 100644 --- a/airflow/models/baseoperator.py +++ b/airflow/models/baseoperator.py @@ -473,6 +473,8 @@ class derived from this one results in the creation of a task object, This allows the executor to trigger higher priority tasks before others when things get backed up. Set priority_weight as a higher number for more important tasks. + As not all database engines support 64-bit integers, values are capped with 32-bit. + Valid range is from -2,147,483,648 to 2,147,483,647. :param weight_rule: weighting method used for the effective total priority weight of the task. Options are: ``{ downstream | upstream | absolute }`` default is ``downstream`` @@ -494,7 +496,8 @@ class derived from this one results in the creation of a task object, Additionally, when set to ``absolute``, there is bonus effect of significantly speeding up the task creation process as for very large DAGs. Options can be set as string or using the constants defined in - the static class ``airflow.utils.WeightRule`` + the static class ``airflow.utils.WeightRule``. + Irrespective of the weight rule, resulting priority values are capped with 32-bit. |experimental| Since 2.9.0, Airflow allows to define custom priority weight strategy, by creating a subclass of diff --git a/airflow/utils/weight_rule.py b/airflow/utils/weight_rule.py index a63358b0322c..490bcfbe8884 100644 --- a/airflow/utils/weight_rule.py +++ b/airflow/utils/weight_rule.py @@ -21,6 +21,18 @@ import methodtools +# Databases do not support arbitrary precision integers, so we need to limit the range of priority weights. +# postgres: -2147483648 to +2147483647 (see https://www.postgresql.org/docs/current/datatype-numeric.html) +# mysql: -2147483648 to +2147483647 (see https://dev.mysql.com/doc/refman/8.4/en/integer-types.html) +# sqlite: -9223372036854775808 to +9223372036854775807 (see https://sqlite.org/datatype3.html) +DB_SAFE_MINIMUM = -2147483648 +DB_SAFE_MAXIMUM = 2147483647 + + +def db_safe_priority(priority_weight: int) -> int: + """Convert priority weight to a safe value for the database.""" + return max(DB_SAFE_MINIMUM, min(DB_SAFE_MAXIMUM, priority_weight)) + class WeightRule(str, Enum): """Weight rules.""" diff --git a/docs/apache-airflow/administration-and-deployment/priority-weight.rst b/docs/apache-airflow/administration-and-deployment/priority-weight.rst index dd61d25fcd4e..7bdeff645026 100644 --- a/docs/apache-airflow/administration-and-deployment/priority-weight.rst +++ b/docs/apache-airflow/administration-and-deployment/priority-weight.rst @@ -63,6 +63,12 @@ Below are the weighting methods. By default, Airflow's weighting method is ``dow The ``priority_weight`` parameter can be used in conjunction with :ref:`concepts:pool`. +.. note:: + + As most database engines are using 32-bit for integers, the maximum value for any calculated or + defined ``priority_weight`` is 2,147,483,647 and the minimum value is -2,147,483,648. + + Custom Weight Rule ------------------ diff --git a/newsfragments/43611.significant.rst b/newsfragments/43611.significant.rst new file mode 100644 index 000000000000..e25fb2a5bba4 --- /dev/null +++ b/newsfragments/43611.significant.rst @@ -0,0 +1,6 @@ +TaskInstance ``priority_weight`` is capped in 32-bit signed integer ranges. + +Some database engines are limited to 32-bit integer values. As some users reported errors in +weight rolled-over to negative values, we decided to cap the value to the 32-bit integer. Even +if internally in python smaller or larger values to 64 bit are supported, ``priority_weight`` is +capped and only storing values from -2147483648 to 2147483647. diff --git a/task_sdk/src/airflow/sdk/definitions/abstractoperator.py b/task_sdk/src/airflow/sdk/definitions/abstractoperator.py index 524ff78b727e..0e51a9748e82 100644 --- a/task_sdk/src/airflow/sdk/definitions/abstractoperator.py +++ b/task_sdk/src/airflow/sdk/definitions/abstractoperator.py @@ -43,6 +43,12 @@ DEFAULT_OWNER: str = "airflow" DEFAULT_POOL_SLOTS: int = 1 DEFAULT_PRIORITY_WEIGHT: int = 1 +# Databases do not support arbitrary precision integers, so we need to limit the range of priority weights. +# postgres: -2147483648 to +2147483647 (see https://www.postgresql.org/docs/current/datatype-numeric.html) +# mysql: -2147483648 to +2147483647 (see https://dev.mysql.com/doc/refman/8.4/en/integer-types.html) +# sqlite: -9223372036854775808 to +9223372036854775807 (see https://sqlite.org/datatype3.html) +MINIMUM_PRIORITY_WEIGHT: int = -2147483648 +MAXIMUM_PRIORITY_WEIGHT: int = 2147483647 DEFAULT_EXECUTOR: str | None = None DEFAULT_QUEUE: str = "default" DEFAULT_IGNORE_FIRST_DEPENDS_ON_PAST: bool = False diff --git a/task_sdk/src/airflow/sdk/definitions/baseoperator.py b/task_sdk/src/airflow/sdk/definitions/baseoperator.py index fc16682a63cd..1eae39576c8e 100644 --- a/task_sdk/src/airflow/sdk/definitions/baseoperator.py +++ b/task_sdk/src/airflow/sdk/definitions/baseoperator.py @@ -60,6 +60,7 @@ from airflow.utils.setup_teardown import SetupTeardownContext from airflow.utils.trigger_rule import TriggerRule from airflow.utils.types import AttributeRemoved +from airflow.utils.weight_rule import db_safe_priority T = TypeVar("T", bound=FunctionType) @@ -364,6 +365,8 @@ class derived from this one results in the creation of a task object, This allows the executor to trigger higher priority tasks before others when things get backed up. Set priority_weight as a higher number for more important tasks. + As not all database engines support 64-bit integers, values are capped with 32-bit. + Valid range is from -2,147,483,648 to 2,147,483,647. :param weight_rule: weighting method used for the effective total priority weight of the task. Options are: ``{ downstream | upstream | absolute }`` default is ``downstream`` @@ -385,7 +388,8 @@ class derived from this one results in the creation of a task object, Additionally, when set to ``absolute``, there is bonus effect of significantly speeding up the task creation process as for very large DAGs. Options can be set as string or using the constants defined in - the static class ``airflow.utils.WeightRule`` + the static class ``airflow.utils.WeightRule``. + Irrespective of the weight rule, resulting priority values are capped with 32-bit. |experimental| Since 2.9.0, Airflow allows to define custom priority weight strategy, by creating a subclass of @@ -869,6 +873,10 @@ def __init__( validate_instance_args(self, BASEOPERATOR_ARGS_EXPECTED_TYPES) + # Ensure priority_weight is within the valid range + # Note: Cross-import from airflow.utils to be cleaned up later + self.priority_weight = db_safe_priority(self.priority_weight) + def __eq__(self, other): if type(self) is type(other): # Use getattr() instead of __dict__ as __dict__ doesn't return diff --git a/tests/utils/test_weight_rule.py b/tests/utils/test_weight_rule.py index 73abafe782b8..387bb9b09e46 100644 --- a/tests/utils/test_weight_rule.py +++ b/tests/utils/test_weight_rule.py @@ -19,7 +19,14 @@ import pytest -from airflow.utils.weight_rule import WeightRule +from airflow.utils.weight_rule import DB_SAFE_MAXIMUM, DB_SAFE_MINIMUM, WeightRule, db_safe_priority + + +def test_db_safe_priority(): + assert db_safe_priority(1) == 1 + assert db_safe_priority(-1) == -1 + assert db_safe_priority(9999999999) == DB_SAFE_MAXIMUM + assert db_safe_priority(-9999999999) == DB_SAFE_MINIMUM class TestWeightRule: