Skip to content

Commit

Permalink
Ensure priority weight is capped at 32-bit integer to prevent roll-ov…
Browse files Browse the repository at this point in the history
…er (apache#43611)

* Ensure priority weight is capped at 32-bit integer to prevent roll-over

* Add newsfragment

* Move range check post type check

* Review feedback - consolidate to single implementation for now
  • Loading branch information
jscheffl authored Nov 14, 2024
1 parent fe4aafe commit ab529d1
Show file tree
Hide file tree
Showing 8 changed files with 60 additions and 9 deletions.
15 changes: 9 additions & 6 deletions airflow/models/abstractoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
from airflow.utils.state import State, TaskInstanceState
from airflow.utils.task_group import MappedTaskGroup
from airflow.utils.trigger_rule import TriggerRule
from airflow.utils.weight_rule import WeightRule
from airflow.utils.weight_rule import WeightRule, db_safe_priority

if TYPE_CHECKING:
from collections.abc import Mapping
Expand Down Expand Up @@ -335,7 +335,7 @@ def priority_weight_total(self) -> int:
)

if isinstance(self.weight_rule, _AbsolutePriorityWeightStrategy):
return self.priority_weight
return db_safe_priority(self.priority_weight)
elif isinstance(self.weight_rule, _DownstreamPriorityWeightStrategy):
upstream = False
elif isinstance(self.weight_rule, _UpstreamPriorityWeightStrategy):
Expand All @@ -344,10 +344,13 @@ def priority_weight_total(self) -> int:
upstream = False
dag = self.get_dag()
if dag is None:
return self.priority_weight
return self.priority_weight + sum(
dag.task_dict[task_id].priority_weight
for task_id in self.get_flat_relative_ids(upstream=upstream)
return db_safe_priority(self.priority_weight)
return db_safe_priority(
self.priority_weight
+ sum(
dag.task_dict[task_id].priority_weight
for task_id in self.get_flat_relative_ids(upstream=upstream)
)
)

@cached_property
Expand Down
5 changes: 4 additions & 1 deletion airflow/models/baseoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,8 @@ class derived from this one results in the creation of a task object,
This allows the executor to trigger higher priority tasks before
others when things get backed up. Set priority_weight as a higher
number for more important tasks.
As not all database engines support 64-bit integers, values are capped with 32-bit.
Valid range is from -2,147,483,648 to 2,147,483,647.
:param weight_rule: weighting method used for the effective total
priority weight of the task. Options are:
``{ downstream | upstream | absolute }`` default is ``downstream``
Expand All @@ -494,7 +496,8 @@ class derived from this one results in the creation of a task object,
Additionally, when set to ``absolute``, there is bonus effect of
significantly speeding up the task creation process as for very large
DAGs. Options can be set as string or using the constants defined in
the static class ``airflow.utils.WeightRule``
the static class ``airflow.utils.WeightRule``.
Irrespective of the weight rule, resulting priority values are capped with 32-bit.
|experimental|
Since 2.9.0, Airflow allows to define custom priority weight strategy,
by creating a subclass of
Expand Down
12 changes: 12 additions & 0 deletions airflow/utils/weight_rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,18 @@

import methodtools

# Databases do not support arbitrary precision integers, so we need to limit the range of priority weights.
# postgres: -2147483648 to +2147483647 (see https://www.postgresql.org/docs/current/datatype-numeric.html)
# mysql: -2147483648 to +2147483647 (see https://dev.mysql.com/doc/refman/8.4/en/integer-types.html)
# sqlite: -9223372036854775808 to +9223372036854775807 (see https://sqlite.org/datatype3.html)
DB_SAFE_MINIMUM = -2147483648
DB_SAFE_MAXIMUM = 2147483647


def db_safe_priority(priority_weight: int) -> int:
"""Convert priority weight to a safe value for the database."""
return max(DB_SAFE_MINIMUM, min(DB_SAFE_MAXIMUM, priority_weight))


class WeightRule(str, Enum):
"""Weight rules."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ Below are the weighting methods. By default, Airflow's weighting method is ``dow

The ``priority_weight`` parameter can be used in conjunction with :ref:`concepts:pool`.

.. note::

As most database engines are using 32-bit for integers, the maximum value for any calculated or
defined ``priority_weight`` is 2,147,483,647 and the minimum value is -2,147,483,648.


Custom Weight Rule
------------------

Expand Down
6 changes: 6 additions & 0 deletions newsfragments/43611.significant.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
TaskInstance ``priority_weight`` is capped in 32-bit signed integer ranges.

Some database engines are limited to 32-bit integer values. As some users reported errors in
weight rolled-over to negative values, we decided to cap the value to the 32-bit integer. Even
if internally in python smaller or larger values to 64 bit are supported, ``priority_weight`` is
capped and only storing values from -2147483648 to 2147483647.
6 changes: 6 additions & 0 deletions task_sdk/src/airflow/sdk/definitions/abstractoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@
DEFAULT_OWNER: str = "airflow"
DEFAULT_POOL_SLOTS: int = 1
DEFAULT_PRIORITY_WEIGHT: int = 1
# Databases do not support arbitrary precision integers, so we need to limit the range of priority weights.
# postgres: -2147483648 to +2147483647 (see https://www.postgresql.org/docs/current/datatype-numeric.html)
# mysql: -2147483648 to +2147483647 (see https://dev.mysql.com/doc/refman/8.4/en/integer-types.html)
# sqlite: -9223372036854775808 to +9223372036854775807 (see https://sqlite.org/datatype3.html)
MINIMUM_PRIORITY_WEIGHT: int = -2147483648
MAXIMUM_PRIORITY_WEIGHT: int = 2147483647
DEFAULT_EXECUTOR: str | None = None
DEFAULT_QUEUE: str = "default"
DEFAULT_IGNORE_FIRST_DEPENDS_ON_PAST: bool = False
Expand Down
10 changes: 9 additions & 1 deletion task_sdk/src/airflow/sdk/definitions/baseoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
from airflow.utils.setup_teardown import SetupTeardownContext
from airflow.utils.trigger_rule import TriggerRule
from airflow.utils.types import AttributeRemoved
from airflow.utils.weight_rule import db_safe_priority

T = TypeVar("T", bound=FunctionType)

Expand Down Expand Up @@ -364,6 +365,8 @@ class derived from this one results in the creation of a task object,
This allows the executor to trigger higher priority tasks before
others when things get backed up. Set priority_weight as a higher
number for more important tasks.
As not all database engines support 64-bit integers, values are capped with 32-bit.
Valid range is from -2,147,483,648 to 2,147,483,647.
:param weight_rule: weighting method used for the effective total
priority weight of the task. Options are:
``{ downstream | upstream | absolute }`` default is ``downstream``
Expand All @@ -385,7 +388,8 @@ class derived from this one results in the creation of a task object,
Additionally, when set to ``absolute``, there is bonus effect of
significantly speeding up the task creation process as for very large
DAGs. Options can be set as string or using the constants defined in
the static class ``airflow.utils.WeightRule``
the static class ``airflow.utils.WeightRule``.
Irrespective of the weight rule, resulting priority values are capped with 32-bit.
|experimental|
Since 2.9.0, Airflow allows to define custom priority weight strategy,
by creating a subclass of
Expand Down Expand Up @@ -869,6 +873,10 @@ def __init__(

validate_instance_args(self, BASEOPERATOR_ARGS_EXPECTED_TYPES)

# Ensure priority_weight is within the valid range
# Note: Cross-import from airflow.utils to be cleaned up later
self.priority_weight = db_safe_priority(self.priority_weight)

def __eq__(self, other):
if type(self) is type(other):
# Use getattr() instead of __dict__ as __dict__ doesn't return
Expand Down
9 changes: 8 additions & 1 deletion tests/utils/test_weight_rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,14 @@

import pytest

from airflow.utils.weight_rule import WeightRule
from airflow.utils.weight_rule import DB_SAFE_MAXIMUM, DB_SAFE_MINIMUM, WeightRule, db_safe_priority


def test_db_safe_priority():
assert db_safe_priority(1) == 1
assert db_safe_priority(-1) == -1
assert db_safe_priority(9999999999) == DB_SAFE_MAXIMUM
assert db_safe_priority(-9999999999) == DB_SAFE_MINIMUM


class TestWeightRule:
Expand Down

0 comments on commit ab529d1

Please sign in to comment.