From f787cfafbc805786463dae9653f65a5774e8f032 Mon Sep 17 00:00:00 2001 From: Jens Scheffler <95105677+jscheffl@users.noreply.github.com> Date: Sun, 17 Nov 2024 22:19:31 +0100 Subject: [PATCH] Ensure priority weight is capped at 32-bit integer to prevent roll-over (#43611) (#44045) * Ensure priority weight is capped at 32-bit integer to prevent roll-over * Add newsfragment * Move range check post type check * Review feedback - consolidate to single implementation for now (cherry picked from commit ab529d13042c9a9c036cd4a03d04c9aa819adf34) --- airflow/models/abstractoperator.py | 15 +++++++++------ airflow/models/baseoperator.py | 5 ++++- airflow/utils/weight_rule.py | 12 ++++++++++++ .../priority-weight.rst | 6 ++++++ newsfragments/43611.significant.rst | 6 ++++++ tests/utils/test_weight_rule.py | 9 ++++++++- 6 files changed, 45 insertions(+), 8 deletions(-) create mode 100644 newsfragments/43611.significant.rst diff --git a/airflow/models/abstractoperator.py b/airflow/models/abstractoperator.py index 45eb3c5fff18..ec3d1f5309ad 100644 --- a/airflow/models/abstractoperator.py +++ b/airflow/models/abstractoperator.py @@ -40,7 +40,7 @@ from airflow.utils.task_group import MappedTaskGroup from airflow.utils.trigger_rule import TriggerRule from airflow.utils.types import NOTSET, ArgNotSet -from airflow.utils.weight_rule import WeightRule +from airflow.utils.weight_rule import WeightRule, db_safe_priority TaskStateChangeCallback = Callable[[Context], None] @@ -467,7 +467,7 @@ def priority_weight_total(self) -> int: ) if isinstance(self.weight_rule, _AbsolutePriorityWeightStrategy): - return self.priority_weight + return db_safe_priority(self.priority_weight) elif isinstance(self.weight_rule, _DownstreamPriorityWeightStrategy): upstream = False elif isinstance(self.weight_rule, _UpstreamPriorityWeightStrategy): @@ -476,10 +476,13 @@ def priority_weight_total(self) -> int: upstream = False dag = self.get_dag() if dag is None: - return self.priority_weight - return self.priority_weight + sum( - dag.task_dict[task_id].priority_weight - for task_id in self.get_flat_relative_ids(upstream=upstream) + return db_safe_priority(self.priority_weight) + return db_safe_priority( + self.priority_weight + + sum( + dag.task_dict[task_id].priority_weight + for task_id in self.get_flat_relative_ids(upstream=upstream) + ) ) @cached_property diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py index 449678860f80..11522060fe06 100644 --- a/airflow/models/baseoperator.py +++ b/airflow/models/baseoperator.py @@ -656,6 +656,8 @@ class derived from this one results in the creation of a task object, This allows the executor to trigger higher priority tasks before others when things get backed up. Set priority_weight as a higher number for more important tasks. + As not all database engines support 64-bit integers, values are capped with 32-bit. + Valid range is from -2,147,483,648 to 2,147,483,647. :param weight_rule: weighting method used for the effective total priority weight of the task. Options are: ``{ downstream | upstream | absolute }`` default is ``downstream`` @@ -677,7 +679,8 @@ class derived from this one results in the creation of a task object, Additionally, when set to ``absolute``, there is bonus effect of significantly speeding up the task creation process as for very large DAGs. Options can be set as string or using the constants defined in - the static class ``airflow.utils.WeightRule`` + the static class ``airflow.utils.WeightRule``. + Irrespective of the weight rule, resulting priority values are capped with 32-bit. |experimental| Since 2.9.0, Airflow allows to define custom priority weight strategy, by creating a subclass of diff --git a/airflow/utils/weight_rule.py b/airflow/utils/weight_rule.py index a63358b0322c..490bcfbe8884 100644 --- a/airflow/utils/weight_rule.py +++ b/airflow/utils/weight_rule.py @@ -21,6 +21,18 @@ import methodtools +# Databases do not support arbitrary precision integers, so we need to limit the range of priority weights. +# postgres: -2147483648 to +2147483647 (see https://www.postgresql.org/docs/current/datatype-numeric.html) +# mysql: -2147483648 to +2147483647 (see https://dev.mysql.com/doc/refman/8.4/en/integer-types.html) +# sqlite: -9223372036854775808 to +9223372036854775807 (see https://sqlite.org/datatype3.html) +DB_SAFE_MINIMUM = -2147483648 +DB_SAFE_MAXIMUM = 2147483647 + + +def db_safe_priority(priority_weight: int) -> int: + """Convert priority weight to a safe value for the database.""" + return max(DB_SAFE_MINIMUM, min(DB_SAFE_MAXIMUM, priority_weight)) + class WeightRule(str, Enum): """Weight rules.""" diff --git a/docs/apache-airflow/administration-and-deployment/priority-weight.rst b/docs/apache-airflow/administration-and-deployment/priority-weight.rst index dd61d25fcd4e..7bdeff645026 100644 --- a/docs/apache-airflow/administration-and-deployment/priority-weight.rst +++ b/docs/apache-airflow/administration-and-deployment/priority-weight.rst @@ -63,6 +63,12 @@ Below are the weighting methods. By default, Airflow's weighting method is ``dow The ``priority_weight`` parameter can be used in conjunction with :ref:`concepts:pool`. +.. note:: + + As most database engines are using 32-bit for integers, the maximum value for any calculated or + defined ``priority_weight`` is 2,147,483,647 and the minimum value is -2,147,483,648. + + Custom Weight Rule ------------------ diff --git a/newsfragments/43611.significant.rst b/newsfragments/43611.significant.rst new file mode 100644 index 000000000000..e25fb2a5bba4 --- /dev/null +++ b/newsfragments/43611.significant.rst @@ -0,0 +1,6 @@ +TaskInstance ``priority_weight`` is capped in 32-bit signed integer ranges. + +Some database engines are limited to 32-bit integer values. As some users reported errors in +weight rolled-over to negative values, we decided to cap the value to the 32-bit integer. Even +if internally in python smaller or larger values to 64 bit are supported, ``priority_weight`` is +capped and only storing values from -2147483648 to 2147483647. diff --git a/tests/utils/test_weight_rule.py b/tests/utils/test_weight_rule.py index 73abafe782b8..387bb9b09e46 100644 --- a/tests/utils/test_weight_rule.py +++ b/tests/utils/test_weight_rule.py @@ -19,7 +19,14 @@ import pytest -from airflow.utils.weight_rule import WeightRule +from airflow.utils.weight_rule import DB_SAFE_MAXIMUM, DB_SAFE_MINIMUM, WeightRule, db_safe_priority + + +def test_db_safe_priority(): + assert db_safe_priority(1) == 1 + assert db_safe_priority(-1) == -1 + assert db_safe_priority(9999999999) == DB_SAFE_MAXIMUM + assert db_safe_priority(-9999999999) == DB_SAFE_MINIMUM class TestWeightRule: