From 982502efaff13166d597f97e75357998393db423 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Wed, 2 Oct 2024 19:31:05 -0700 Subject: [PATCH] Remove special handling of backfills in scheduler (#42678) Before airflow 3.0, scheduler would completely ignore all backfill runs. This PR gets rid of that logic so that backfill runs are treated the same as non-backfill. This is a baby step on the way to adding scheduling of backfill dag runs into the scheduler. --- airflow/jobs/scheduler_job_runner.py | 11 ++------- airflow/models/dagrun.py | 4 ++-- tests/jobs/test_scheduler_job.py | 35 ++++++++++++++-------------- 3 files changed, 22 insertions(+), 28 deletions(-) diff --git a/airflow/jobs/scheduler_job_runner.py b/airflow/jobs/scheduler_job_runner.py index de6ce5019b9d..21c871e95125 100644 --- a/airflow/jobs/scheduler_job_runner.py +++ b/airflow/jobs/scheduler_job_runner.py @@ -342,14 +342,11 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session) - num_starved_tasks = len(starved_tasks) num_starved_tasks_task_dagrun_concurrency = len(starved_tasks_task_dagrun_concurrency) - # Get task instances associated with scheduled - # DagRuns which are not backfilled, in the given states, - # and the dag is not paused query = ( select(TI) .with_hint(TI, "USE INDEX (ti_state)", dialect_name="mysql") .join(TI.dag_run) - .where(DR.run_type != DagRunType.BACKFILL_JOB, DR.state == DagRunState.RUNNING) + .where(DR.state == DagRunState.RUNNING) .join(TI.dag_model) .where(not_(DM.is_paused)) .where(TI.state == TaskInstanceState.SCHEDULED) @@ -1020,7 +1017,6 @@ def _update_dag_run_state_for_paused_dags(self, session: Session = NEW_SESSION) .where( DagModel.is_paused == expression.true(), DagRun.state == DagRunState.RUNNING, - DagRun.run_type != DagRunType.BACKFILL_JOB, ) .having(DagRun.last_scheduling_decision <= func.max(TI.updated_at)) .group_by(DagRun) @@ -1838,10 +1834,7 @@ def adopt_or_reset_orphaned_tasks(self, session: Session = NEW_SESSION) -> int: .join(TI.queued_by_job) .where(Job.state.is_distinct_from(JobState.RUNNING)) .join(TI.dag_run) - .where( - DagRun.run_type != DagRunType.BACKFILL_JOB, - DagRun.state == DagRunState.RUNNING, - ) + .where(DagRun.state == DagRunState.RUNNING) .options(load_only(TI.dag_id, TI.task_id, TI.run_id)) ) diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py index 4928c7fcbd8f..d1dbeaf41e58 100644 --- a/airflow/models/dagrun.py +++ b/airflow/models/dagrun.py @@ -405,7 +405,7 @@ def get_running_dag_runs_to_examine(cls, session: Session) -> Query: query = ( select(cls) .with_hint(cls, "USE INDEX (idx_dag_run_running_dags)", dialect_name="mysql") - .where(cls.state == DagRunState.RUNNING, cls.run_type != DagRunType.BACKFILL_JOB) + .where(cls.state == DagRunState.RUNNING) .join(DagModel, DagModel.dag_id == cls.dag_id) .where(DagModel.is_paused == false(), DagModel.is_active == true()) .order_by( @@ -450,7 +450,7 @@ def get_queued_dag_runs_to_set_running(cls, session: Session) -> Query: ) query = ( select(cls) - .where(cls.state == DagRunState.QUEUED, cls.run_type != DagRunType.BACKFILL_JOB) + .where(cls.state == DagRunState.QUEUED) .join(DagModel, DagModel.dag_id == cls.dag_id) .where(DagModel.is_paused == false(), DagModel.is_active == true()) .outerjoin(running_drs, running_drs.c.dag_id == DagRun.dag_id) diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index 97d84da9c4d5..c8b40f6af7e2 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -580,11 +580,11 @@ def test_execute_task_instances_is_paused_wont_execute(self, session, dag_maker) assert State.SCHEDULED == ti1.state session.rollback() - def test_execute_task_instances_backfill_tasks_wont_execute(self, dag_maker): + def test_execute_task_instances_backfill_tasks_will_execute(self, dag_maker): """ Tests that backfill tasks won't get executed. """ - dag_id = "SchedulerJobTest.test_execute_task_instances_backfill_tasks_wont_execute" + dag_id = "SchedulerJobTest.test_execute_task_instances_backfill_tasks_will_execute" task_id_1 = "dummy_task" with dag_maker(dag_id=dag_id): @@ -606,7 +606,7 @@ def test_execute_task_instances_backfill_tasks_wont_execute(self, dag_maker): self.job_runner._critical_section_enqueue_task_instances(session) session.flush() ti1.refresh_from_db() - assert State.SCHEDULED == ti1.state + assert ti1.state == TaskInstanceState.QUEUED session.rollback() @conf_vars({("scheduler", "standalone_dag_processor"): "False"}) @@ -696,24 +696,25 @@ def test_find_executable_task_instances_backfill(self, dag_maker): self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull) session = settings.Session() - dr1 = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED) - dr2 = dag_maker.create_dagrun_after(dr1, run_type=DagRunType.BACKFILL_JOB, state=State.RUNNING) + dr_non_backfill = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED) + dr_backfill = dag_maker.create_dagrun_after( + dr_non_backfill, run_type=DagRunType.BACKFILL_JOB, state=State.RUNNING + ) + + ti_backfill = dr_backfill.get_task_instance(task1.task_id) + ti_non_backfill = dr_non_backfill.get_task_instance(task1.task_id) - ti_backfill = dr2.get_task_instance(task1.task_id) - ti_with_dagrun = dr1.get_task_instance(task1.task_id) - # ti_with_paused ti_backfill.state = State.SCHEDULED - ti_with_dagrun.state = State.SCHEDULED + ti_non_backfill.state = State.SCHEDULED - session.merge(dr2) + session.merge(dr_backfill) session.merge(ti_backfill) - session.merge(ti_with_dagrun) + session.merge(ti_non_backfill) session.flush() - res = self.job_runner._executable_task_instances_to_queued(max_tis=32, session=session) - assert 1 == len(res) - res_keys = (x.key for x in res) - assert ti_with_dagrun.key in res_keys + queued_tis = self.job_runner._executable_task_instances_to_queued(max_tis=32, session=session) + assert len(queued_tis) == 2 + assert {x.key for x in queued_tis} == {ti_non_backfill.key, ti_backfill.key} session.rollback() def test_find_executable_task_instances_pool(self, dag_maker): @@ -2124,7 +2125,7 @@ def test_adopt_or_reset_orphaned_tasks(self, dag_maker): assert ti.state == State.NONE ti2 = dr2.get_task_instance(task_id=op1.task_id, session=session) - assert ti2.state == State.QUEUED, "Tasks run by Backfill Jobs should not be reset" + assert ti2.state == State.NONE, "Tasks run by Backfill Jobs should be treated the same" def test_adopt_or_reset_orphaned_tasks_multiple_executors(self, dag_maker, mock_executors): """Test that with multiple executors configured tasks are sorted correctly and handed off to the @@ -5802,7 +5803,7 @@ def test_update_dagrun_state_for_paused_dag_not_for_backfill(self, dag_maker, se session.flush() (backfill_run,) = DagRun.find(dag_id=dag.dag_id, run_type=DagRunType.BACKFILL_JOB, session=session) - assert backfill_run.state == State.RUNNING + assert backfill_run.state == State.SUCCESS def test_asset_orphaning(self, dag_maker, session): asset1 = Asset(uri="ds1")