Skip to content

Commit

Permalink
Remove special handling of backfills in scheduler (apache#42678)
Browse files Browse the repository at this point in the history
Before airflow 3.0, scheduler would completely ignore all backfill runs. This PR gets rid of that logic so that backfill runs are treated the same as non-backfill. This is a baby step on the way to adding scheduling of backfill dag runs into the scheduler.
  • Loading branch information
dstandish authored Oct 3, 2024
1 parent 976064d commit 982502e
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 28 deletions.
11 changes: 2 additions & 9 deletions airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,14 +342,11 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session) -
num_starved_tasks = len(starved_tasks)
num_starved_tasks_task_dagrun_concurrency = len(starved_tasks_task_dagrun_concurrency)

# Get task instances associated with scheduled
# DagRuns which are not backfilled, in the given states,
# and the dag is not paused
query = (
select(TI)
.with_hint(TI, "USE INDEX (ti_state)", dialect_name="mysql")
.join(TI.dag_run)
.where(DR.run_type != DagRunType.BACKFILL_JOB, DR.state == DagRunState.RUNNING)
.where(DR.state == DagRunState.RUNNING)
.join(TI.dag_model)
.where(not_(DM.is_paused))
.where(TI.state == TaskInstanceState.SCHEDULED)
Expand Down Expand Up @@ -1020,7 +1017,6 @@ def _update_dag_run_state_for_paused_dags(self, session: Session = NEW_SESSION)
.where(
DagModel.is_paused == expression.true(),
DagRun.state == DagRunState.RUNNING,
DagRun.run_type != DagRunType.BACKFILL_JOB,
)
.having(DagRun.last_scheduling_decision <= func.max(TI.updated_at))
.group_by(DagRun)
Expand Down Expand Up @@ -1838,10 +1834,7 @@ def adopt_or_reset_orphaned_tasks(self, session: Session = NEW_SESSION) -> int:
.join(TI.queued_by_job)
.where(Job.state.is_distinct_from(JobState.RUNNING))
.join(TI.dag_run)
.where(
DagRun.run_type != DagRunType.BACKFILL_JOB,
DagRun.state == DagRunState.RUNNING,
)
.where(DagRun.state == DagRunState.RUNNING)
.options(load_only(TI.dag_id, TI.task_id, TI.run_id))
)

Expand Down
4 changes: 2 additions & 2 deletions airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ def get_running_dag_runs_to_examine(cls, session: Session) -> Query:
query = (
select(cls)
.with_hint(cls, "USE INDEX (idx_dag_run_running_dags)", dialect_name="mysql")
.where(cls.state == DagRunState.RUNNING, cls.run_type != DagRunType.BACKFILL_JOB)
.where(cls.state == DagRunState.RUNNING)
.join(DagModel, DagModel.dag_id == cls.dag_id)
.where(DagModel.is_paused == false(), DagModel.is_active == true())
.order_by(
Expand Down Expand Up @@ -450,7 +450,7 @@ def get_queued_dag_runs_to_set_running(cls, session: Session) -> Query:
)
query = (
select(cls)
.where(cls.state == DagRunState.QUEUED, cls.run_type != DagRunType.BACKFILL_JOB)
.where(cls.state == DagRunState.QUEUED)
.join(DagModel, DagModel.dag_id == cls.dag_id)
.where(DagModel.is_paused == false(), DagModel.is_active == true())
.outerjoin(running_drs, running_drs.c.dag_id == DagRun.dag_id)
Expand Down
35 changes: 18 additions & 17 deletions tests/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -580,11 +580,11 @@ def test_execute_task_instances_is_paused_wont_execute(self, session, dag_maker)
assert State.SCHEDULED == ti1.state
session.rollback()

def test_execute_task_instances_backfill_tasks_wont_execute(self, dag_maker):
def test_execute_task_instances_backfill_tasks_will_execute(self, dag_maker):
"""
Tests that backfill tasks won't get executed.
"""
dag_id = "SchedulerJobTest.test_execute_task_instances_backfill_tasks_wont_execute"
dag_id = "SchedulerJobTest.test_execute_task_instances_backfill_tasks_will_execute"
task_id_1 = "dummy_task"

with dag_maker(dag_id=dag_id):
Expand All @@ -606,7 +606,7 @@ def test_execute_task_instances_backfill_tasks_wont_execute(self, dag_maker):
self.job_runner._critical_section_enqueue_task_instances(session)
session.flush()
ti1.refresh_from_db()
assert State.SCHEDULED == ti1.state
assert ti1.state == TaskInstanceState.QUEUED
session.rollback()

@conf_vars({("scheduler", "standalone_dag_processor"): "False"})
Expand Down Expand Up @@ -696,24 +696,25 @@ def test_find_executable_task_instances_backfill(self, dag_maker):
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
session = settings.Session()

dr1 = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED)
dr2 = dag_maker.create_dagrun_after(dr1, run_type=DagRunType.BACKFILL_JOB, state=State.RUNNING)
dr_non_backfill = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED)
dr_backfill = dag_maker.create_dagrun_after(
dr_non_backfill, run_type=DagRunType.BACKFILL_JOB, state=State.RUNNING
)

ti_backfill = dr_backfill.get_task_instance(task1.task_id)
ti_non_backfill = dr_non_backfill.get_task_instance(task1.task_id)

ti_backfill = dr2.get_task_instance(task1.task_id)
ti_with_dagrun = dr1.get_task_instance(task1.task_id)
# ti_with_paused
ti_backfill.state = State.SCHEDULED
ti_with_dagrun.state = State.SCHEDULED
ti_non_backfill.state = State.SCHEDULED

session.merge(dr2)
session.merge(dr_backfill)
session.merge(ti_backfill)
session.merge(ti_with_dagrun)
session.merge(ti_non_backfill)
session.flush()

res = self.job_runner._executable_task_instances_to_queued(max_tis=32, session=session)
assert 1 == len(res)
res_keys = (x.key for x in res)
assert ti_with_dagrun.key in res_keys
queued_tis = self.job_runner._executable_task_instances_to_queued(max_tis=32, session=session)
assert len(queued_tis) == 2
assert {x.key for x in queued_tis} == {ti_non_backfill.key, ti_backfill.key}
session.rollback()

def test_find_executable_task_instances_pool(self, dag_maker):
Expand Down Expand Up @@ -2124,7 +2125,7 @@ def test_adopt_or_reset_orphaned_tasks(self, dag_maker):
assert ti.state == State.NONE

ti2 = dr2.get_task_instance(task_id=op1.task_id, session=session)
assert ti2.state == State.QUEUED, "Tasks run by Backfill Jobs should not be reset"
assert ti2.state == State.NONE, "Tasks run by Backfill Jobs should be treated the same"

def test_adopt_or_reset_orphaned_tasks_multiple_executors(self, dag_maker, mock_executors):
"""Test that with multiple executors configured tasks are sorted correctly and handed off to the
Expand Down Expand Up @@ -5802,7 +5803,7 @@ def test_update_dagrun_state_for_paused_dag_not_for_backfill(self, dag_maker, se
session.flush()

(backfill_run,) = DagRun.find(dag_id=dag.dag_id, run_type=DagRunType.BACKFILL_JOB, session=session)
assert backfill_run.state == State.RUNNING
assert backfill_run.state == State.SUCCESS

def test_asset_orphaning(self, dag_maker, session):
asset1 = Asset(uri="ds1")
Expand Down

0 comments on commit 982502e

Please sign in to comment.