Skip to content

Commit

Permalink
Ensure priority weight is capped at 32-bit integer to prevent roll-ov…
Browse files Browse the repository at this point in the history
…er (apache#43611) (apache#44045)

* Ensure priority weight is capped at 32-bit integer to prevent roll-over

* Add newsfragment

* Move range check post type check

* Review feedback - consolidate to single implementation for now

(cherry picked from commit ab529d1)
  • Loading branch information
jscheffl authored Nov 17, 2024
1 parent 2a32632 commit f787cfa
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 8 deletions.
15 changes: 9 additions & 6 deletions airflow/models/abstractoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
from airflow.utils.task_group import MappedTaskGroup
from airflow.utils.trigger_rule import TriggerRule
from airflow.utils.types import NOTSET, ArgNotSet
from airflow.utils.weight_rule import WeightRule
from airflow.utils.weight_rule import WeightRule, db_safe_priority

TaskStateChangeCallback = Callable[[Context], None]

Expand Down Expand Up @@ -467,7 +467,7 @@ def priority_weight_total(self) -> int:
)

if isinstance(self.weight_rule, _AbsolutePriorityWeightStrategy):
return self.priority_weight
return db_safe_priority(self.priority_weight)
elif isinstance(self.weight_rule, _DownstreamPriorityWeightStrategy):
upstream = False
elif isinstance(self.weight_rule, _UpstreamPriorityWeightStrategy):
Expand All @@ -476,10 +476,13 @@ def priority_weight_total(self) -> int:
upstream = False
dag = self.get_dag()
if dag is None:
return self.priority_weight
return self.priority_weight + sum(
dag.task_dict[task_id].priority_weight
for task_id in self.get_flat_relative_ids(upstream=upstream)
return db_safe_priority(self.priority_weight)
return db_safe_priority(
self.priority_weight
+ sum(
dag.task_dict[task_id].priority_weight
for task_id in self.get_flat_relative_ids(upstream=upstream)
)
)

@cached_property
Expand Down
5 changes: 4 additions & 1 deletion airflow/models/baseoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -656,6 +656,8 @@ class derived from this one results in the creation of a task object,
This allows the executor to trigger higher priority tasks before
others when things get backed up. Set priority_weight as a higher
number for more important tasks.
As not all database engines support 64-bit integers, values are capped with 32-bit.
Valid range is from -2,147,483,648 to 2,147,483,647.
:param weight_rule: weighting method used for the effective total
priority weight of the task. Options are:
``{ downstream | upstream | absolute }`` default is ``downstream``
Expand All @@ -677,7 +679,8 @@ class derived from this one results in the creation of a task object,
Additionally, when set to ``absolute``, there is bonus effect of
significantly speeding up the task creation process as for very large
DAGs. Options can be set as string or using the constants defined in
the static class ``airflow.utils.WeightRule``
the static class ``airflow.utils.WeightRule``.
Irrespective of the weight rule, resulting priority values are capped with 32-bit.
|experimental|
Since 2.9.0, Airflow allows to define custom priority weight strategy,
by creating a subclass of
Expand Down
12 changes: 12 additions & 0 deletions airflow/utils/weight_rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,18 @@

import methodtools

# Databases do not support arbitrary precision integers, so we need to limit the range of priority weights.
# postgres: -2147483648 to +2147483647 (see https://www.postgresql.org/docs/current/datatype-numeric.html)
# mysql: -2147483648 to +2147483647 (see https://dev.mysql.com/doc/refman/8.4/en/integer-types.html)
# sqlite: -9223372036854775808 to +9223372036854775807 (see https://sqlite.org/datatype3.html)
DB_SAFE_MINIMUM = -2147483648
DB_SAFE_MAXIMUM = 2147483647


def db_safe_priority(priority_weight: int) -> int:
"""Convert priority weight to a safe value for the database."""
return max(DB_SAFE_MINIMUM, min(DB_SAFE_MAXIMUM, priority_weight))


class WeightRule(str, Enum):
"""Weight rules."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ Below are the weighting methods. By default, Airflow's weighting method is ``dow

The ``priority_weight`` parameter can be used in conjunction with :ref:`concepts:pool`.

.. note::

As most database engines are using 32-bit for integers, the maximum value for any calculated or
defined ``priority_weight`` is 2,147,483,647 and the minimum value is -2,147,483,648.


Custom Weight Rule
------------------

Expand Down
6 changes: 6 additions & 0 deletions newsfragments/43611.significant.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
TaskInstance ``priority_weight`` is capped in 32-bit signed integer ranges.

Some database engines are limited to 32-bit integer values. As some users reported errors in
weight rolled-over to negative values, we decided to cap the value to the 32-bit integer. Even
if internally in python smaller or larger values to 64 bit are supported, ``priority_weight`` is
capped and only storing values from -2147483648 to 2147483647.
9 changes: 8 additions & 1 deletion tests/utils/test_weight_rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,14 @@

import pytest

from airflow.utils.weight_rule import WeightRule
from airflow.utils.weight_rule import DB_SAFE_MAXIMUM, DB_SAFE_MINIMUM, WeightRule, db_safe_priority


def test_db_safe_priority():
assert db_safe_priority(1) == 1
assert db_safe_priority(-1) == -1
assert db_safe_priority(9999999999) == DB_SAFE_MAXIMUM
assert db_safe_priority(-9999999999) == DB_SAFE_MINIMUM


class TestWeightRule:
Expand Down

0 comments on commit f787cfa

Please sign in to comment.