From b9b24f4ee897ea2cc2e27b2cc875b2da1096c9ee Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Fri, 11 Oct 2024 14:37:59 -0700 Subject: [PATCH] Add a "backfill create" command (#42922) I create a new command group "backfill" for management of backfills. The first action is "create" which creates a backfill. Some others may follow such as pause / cancel. --- airflow/cli/cli_config.py | 137 ++++------- airflow/cli/commands/backfill_command.py | 44 ++++ airflow/cli/commands/dag_command.py | 102 +------- airflow/executors/executor_loader.py | 4 +- airflow/models/dag.py | 3 - tests/cli/commands/test_backfill_command.py | 89 +++++++ tests/cli/commands/test_dag_command.py | 251 -------------------- tests/jobs/test_backfill_job.py | 20 -- 8 files changed, 176 insertions(+), 474 deletions(-) create mode 100644 airflow/cli/commands/backfill_command.py create mode 100644 tests/cli/commands/test_backfill_command.py diff --git a/airflow/cli/cli_config.py b/airflow/cli/cli_config.py index 4b42f2b96fb2..e68a464f61bb 100644 --- a/airflow/cli/cli_config.py +++ b/airflow/cli/cli_config.py @@ -264,7 +264,7 @@ def string_lower_type(val): help="The number of next execution datetimes to show", ) -# backfill +# misc ARG_MARK_SUCCESS = Arg( ("-m", "--mark-success"), help="Mark jobs as succeeded without running them", action="store_true" ) @@ -300,78 +300,34 @@ def string_lower_type(val): ) ARG_VERBOSE = Arg(("-v", "--verbose"), help="Make logging output more verbose", action="store_true") ARG_LOCAL = Arg(("-l", "--local"), help="Run the task using the LocalExecutor", action="store_true") -ARG_DONOT_PICKLE = Arg( - ("-x", "--donot-pickle"), - help=( - "Do not attempt to pickle the DAG object to send over " - "to the workers, just tell the workers to run their version " - "of the code" - ), - action="store_true", -) -ARG_BF_IGNORE_DEPENDENCIES = Arg( - ("-i", "--ignore-dependencies"), - help=( - "Skip upstream tasks, run only the tasks " - "matching the regexp. Only works in conjunction " - "with task_regex" - ), - action="store_true", -) ARG_POOL = Arg(("--pool",), "Resource pool to use") -ARG_DELAY_ON_LIMIT = Arg( - ("--delay-on-limit",), - help=( - "Amount of time in seconds to wait when the limit " - "on maximum active dag runs (max_active_runs) has " - "been reached before trying to execute a dag run " - "again" - ), - type=float, - default=1.0, -) -ARG_RESET_DAG_RUN = Arg( - ("--reset-dagruns",), - help=( - "if set, the backfill will delete existing " - "backfill-related DAG runs and start " - "anew with fresh, running DAG runs" - ), - action="store_true", -) -ARG_RERUN_FAILED_TASKS = Arg( - ("--rerun-failed-tasks",), - help=( - "if set, the backfill will auto-rerun " - "all the failed tasks for the backfill date range " - "instead of throwing exceptions" - ), - action="store_true", -) -ARG_CONTINUE_ON_FAILURES = Arg( - ("--continue-on-failures",), - help=("if set, the backfill will keep going even if some of the tasks failed"), - action="store_true", + + +# backfill +ARG_BACKFILL_DAG = Arg(flags=("--dag",), help="The dag to backfill.", required=True) +ARG_BACKFILL_FROM_DATE = Arg( + ("--from-date",), help="Earliest logical date to backfill.", type=parsedate, required=True ) -ARG_DISABLE_RETRY = Arg( - ("--disable-retry",), - help=("if set, the backfill will set tasks as failed without retrying."), - action="store_true", +ARG_BACKFILL_TO_DATE = Arg( + ("--to-date",), help="Latest logical date to backfill", type=parsedate, required=True ) +ARG_DAG_RUN_CONF = Arg(flags=("--dag-run-conf",), help="JSON dag run configuration.") ARG_RUN_BACKWARDS = Arg( - ( - "-B", - "--run-backwards", - ), + flags=("--run-backwards",), help=( - "if set, the backfill will run tasks from the most " - "recent day first. if there are tasks that depend_on_past " - "this option will throw an exception" + "If set, the backfill will run tasks from the most recent logical date first. " + "Not supported if there are tasks that depend_on_past." ), action="store_true", ) +ARG_MAX_ACTIVE_RUNS = Arg( + ("--max-active-runs",), + type=positive_int(allow_zero=False), + help="Max active runs for this backfill.", +) +# misc ARG_TREAT_DAG_ID_AS_REGEX = Arg( ("--treat-dag-id-as-regex",), help=("if set, dag_id will be treated as regex instead of an exact string"), @@ -1056,6 +1012,22 @@ class GroupCommand(NamedTuple): CLICommand = Union[ActionCommand, GroupCommand] +BACKFILL_COMMANDS = ( + ActionCommand( + name="create", + help="Create a backfill for a dag.", + description="Run subsections of a DAG for a specified date range.", + func=lazy_load_command("airflow.cli.commands.backfill_command.create_backfill"), + args=( + ARG_BACKFILL_DAG, + ARG_BACKFILL_FROM_DATE, + ARG_BACKFILL_TO_DATE, + ARG_DAG_RUN_CONF, + ARG_RUN_BACKWARDS, + ARG_MAX_ACTIVE_RUNS, + ), + ), +) DAGS_COMMANDS = ( ActionCommand( name="details", @@ -1227,40 +1199,6 @@ class GroupCommand(NamedTuple): ARG_VERBOSE, ), ), - ActionCommand( - name="backfill", - help="Run subsections of a DAG for a specified date range", - description=( - "Run subsections of a DAG for a specified date range. If reset_dag_run option is used, " - "backfill will first prompt users whether airflow should clear all the previous dag_run and " - "task_instances within the backfill date range. If rerun_failed_tasks is used, backfill " - "will auto re-run the previous failed task instances within the backfill date range" - ), - func=lazy_load_command("airflow.cli.commands.dag_command.dag_backfill"), - args=( - ARG_DAG_ID, - ARG_TASK_REGEX, - ARG_START_DATE, - ARG_END_DATE, - ARG_MARK_SUCCESS, - ARG_LOCAL, - ARG_DONOT_PICKLE, - ARG_YES, - ARG_CONTINUE_ON_FAILURES, - ARG_DISABLE_RETRY, - ARG_BF_IGNORE_DEPENDENCIES, - ARG_SUBDIR, - ARG_POOL, - ARG_DELAY_ON_LIMIT, - ARG_DRY_RUN, - ARG_VERBOSE, - ARG_CONF, - ARG_RESET_DAG_RUN, - ARG_RERUN_FAILED_TASKS, - ARG_RUN_BACKWARDS, - ARG_TREAT_DAG_ID_AS_REGEX, - ), - ), ActionCommand( name="test", help="Execute one single DagRun", @@ -1913,6 +1851,11 @@ class GroupCommand(NamedTuple): help="Manage DAGs", subcommands=DAGS_COMMANDS, ), + GroupCommand( + name="backfill", + help="Manage backfills", + subcommands=BACKFILL_COMMANDS, + ), GroupCommand( name="tasks", help="Manage tasks", diff --git a/airflow/cli/commands/backfill_command.py b/airflow/cli/commands/backfill_command.py new file mode 100644 index 000000000000..8714ed558500 --- /dev/null +++ b/airflow/cli/commands/backfill_command.py @@ -0,0 +1,44 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +import logging +import signal + +from airflow import settings +from airflow.models.backfill import _create_backfill +from airflow.utils import cli as cli_utils +from airflow.utils.cli import sigint_handler +from airflow.utils.providers_configuration_loader import providers_configuration_loaded + + +@cli_utils.action_cli +@providers_configuration_loaded +def create_backfill(args) -> None: + """Create backfill job or dry run for a DAG or list of DAGs using regex.""" + logging.basicConfig(level=settings.LOGGING_LEVEL, format=settings.SIMPLE_LOG_FORMAT) + signal.signal(signal.SIGTERM, sigint_handler) + + _create_backfill( + dag_id=args.dag, + from_date=args.from_date, + to_date=args.to_date, + max_active_runs=args.max_active_runs, + reverse=args.run_backwards, + dag_run_conf=args.dag_run_conf, + ) diff --git a/airflow/cli/commands/dag_command.py b/airflow/cli/commands/dag_command.py index 069770975476..83d0430a717b 100644 --- a/airflow/cli/commands/dag_command.py +++ b/airflow/cli/commands/dag_command.py @@ -23,7 +23,6 @@ import json import logging import operator -import signal import subprocess import sys from typing import TYPE_CHECKING @@ -31,128 +30,31 @@ import re2 from sqlalchemy import delete, select -from airflow import settings from airflow.api.client import get_current_api_client from airflow.api_connexion.schemas.dag_schema import dag_schema from airflow.cli.simple_table import AirflowConsole -from airflow.configuration import conf from airflow.exceptions import AirflowException from airflow.jobs.job import Job from airflow.models import DagBag, DagModel, DagRun, TaskInstance -from airflow.models.dag import DAG from airflow.models.serialized_dag import SerializedDagModel from airflow.utils import cli as cli_utils, timezone -from airflow.utils.cli import get_dag, get_dags, process_subdir, sigint_handler, suppress_logs_and_warning +from airflow.utils.cli import get_dag, process_subdir, suppress_logs_and_warning from airflow.utils.dag_parsing_context import _airflow_parsing_context_manager from airflow.utils.dot_renderer import render_dag, render_dag_dependencies from airflow.utils.helpers import ask_yesno from airflow.utils.providers_configuration_loader import providers_configuration_loaded from airflow.utils.session import NEW_SESSION, create_session, provide_session from airflow.utils.state import DagRunState -from airflow.utils.types import DagRunTriggeredByType if TYPE_CHECKING: from graphviz.dot import Dot from sqlalchemy.orm import Session + from airflow.models.dag import DAG from airflow.timetables.base import DataInterval - log = logging.getLogger(__name__) -def _run_dag_backfill(dags: list[DAG], args) -> None: - # If only one date is passed, using same as start and end - args.end_date = args.end_date or args.start_date - args.start_date = args.start_date or args.end_date - - run_conf = None - if args.conf: - run_conf = json.loads(args.conf) - - for dag in dags: - if args.task_regex: - dag = dag.partial_subset( - task_ids_or_regex=args.task_regex, include_upstream=not args.ignore_dependencies - ) - if not dag.task_dict: - raise AirflowException( - f"There are no tasks that match '{args.task_regex}' regex. Nothing to run, exiting..." - ) - - if args.dry_run: - print(f"Dry run of DAG {dag.dag_id} on {args.start_date}") - dagrun_infos = dag.iter_dagrun_infos_between(earliest=args.start_date, latest=args.end_date) - for dagrun_info in dagrun_infos: - dr = DagRun( - dag.dag_id, - execution_date=dagrun_info.logical_date, - data_interval=dagrun_info.data_interval, - triggered_by=DagRunTriggeredByType.CLI, - ) - - for task in dag.tasks: - print(f"Task {task.task_id} located in DAG {dag.dag_id}") - ti = TaskInstance(task, run_id=None) - ti.dag_run = dr - ti.dry_run() - else: - if args.reset_dagruns: - DAG.clear_dags( - [dag], - start_date=args.start_date, - end_date=args.end_date, - confirm_prompt=not args.yes, - dag_run_state=DagRunState.QUEUED, - ) - - try: - dag.run( - start_date=args.start_date, - end_date=args.end_date, - mark_success=args.mark_success, - local=args.local, - donot_pickle=(args.donot_pickle or conf.getboolean("core", "donot_pickle")), - ignore_first_depends_on_past=args.ignore_first_depends_on_past, - ignore_task_deps=args.ignore_dependencies, - pool=args.pool, - delay_on_limit_secs=args.delay_on_limit, - verbose=args.verbose, - conf=run_conf, - rerun_failed_tasks=args.rerun_failed_tasks, - run_backwards=args.run_backwards, - continue_on_failures=args.continue_on_failures, - disable_retry=args.disable_retry, - ) - except ValueError as vr: - print(str(vr)) - sys.exit(1) - - -@cli_utils.action_cli -@providers_configuration_loaded -def dag_backfill(args, dag: list[DAG] | DAG | None = None) -> None: - """Create backfill job or dry run for a DAG or list of DAGs using regex.""" - logging.basicConfig(level=settings.LOGGING_LEVEL, format=settings.SIMPLE_LOG_FORMAT) - signal.signal(signal.SIGTERM, sigint_handler) - args.ignore_first_depends_on_past = True - - if not args.start_date and not args.end_date: - raise AirflowException("Provide a start_date and/or end_date") - - if not dag: - dags = get_dags(args.subdir, dag_id=args.dag_id, use_regex=args.treat_dag_id_as_regex) - elif isinstance(dag, list): - dags = dag - else: - dags = [dag] - del dag - - dags.sort(key=lambda d: d.dag_id) - _run_dag_backfill(dags, args) - if len(dags) > 1: - log.info("All of the backfills are done.") - - @cli_utils.action_cli @providers_configuration_loaded def dag_trigger(args) -> None: diff --git a/airflow/executors/executor_loader.py b/airflow/executors/executor_loader.py index ec79860918b8..4a940793df27 100644 --- a/airflow/executors/executor_loader.py +++ b/airflow/executors/executor_loader.py @@ -171,10 +171,8 @@ def set_default_executor(cls, executor: BaseExecutor) -> None: """ Externally set an executor to be the default. - This is used in rare cases such as dag.run which allows, as a user convenience, to provide + This is used in rare cases such as dag.test which allows, as a user convenience, to provide the executor by cli/argument instead of Airflow configuration - - todo: given comments above, is this needed anymore since DAG.run is removed? """ exec_class_name = executor.__class__.__qualname__ exec_name = ExecutorName(f"{executor.__module__}.{exec_class_name}") diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 8d85152677e0..f5def92ea92a 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -2293,9 +2293,6 @@ def _remove_task(self, task_id: str) -> None: self.task_count = len(self.task_dict) - def run(self, *args, **kwargs): - """Leaving this here to be removed in other PR for simpler review.""" - def cli(self): """Exposes a CLI specific to this DAG.""" check_cycle(self) diff --git a/tests/cli/commands/test_backfill_command.py b/tests/cli/commands/test_backfill_command.py new file mode 100644 index 000000000000..c01e1e4f9d07 --- /dev/null +++ b/tests/cli/commands/test_backfill_command.py @@ -0,0 +1,89 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import argparse +from datetime import datetime +from unittest import mock + +import pendulum +import pytest + +import airflow.cli.commands.backfill_command +from airflow.cli import cli_parser +from airflow.models import DagBag +from airflow.utils import timezone + +from dev.tests_common.test_utils.db import clear_db_backfills, clear_db_dags, clear_db_runs + +DEFAULT_DATE = timezone.make_aware(datetime(2015, 1, 1), timezone=timezone.utc) +if pendulum.__version__.startswith("3"): + DEFAULT_DATE_REPR = DEFAULT_DATE.isoformat(sep=" ") +else: + DEFAULT_DATE_REPR = DEFAULT_DATE.isoformat() + +# TODO: Check if tests needs side effects - locally there's missing DAG + +pytestmark = [pytest.mark.db_test, pytest.mark.skip_if_database_isolation_mode] + + +class TestCliBackfill: + parser: argparse.ArgumentParser + + @classmethod + def setup_class(cls): + cls.dagbag = DagBag(include_examples=True) + cls.dagbag.sync_to_db() + cls.parser = cli_parser.get_parser() + + @classmethod + def teardown_class(cls) -> None: + clear_db_runs() + clear_db_dags() + clear_db_backfills() + + def setup_method(self): + clear_db_runs() # clean-up all dag run before start each test + clear_db_dags() + clear_db_backfills() + + @mock.patch("airflow.cli.commands.backfill_command._create_backfill") + def test_backfill(self, mock_create): + airflow.cli.commands.backfill_command.create_backfill( + self.parser.parse_args( + [ + "backfill", + "create", + "--dag", + "example_bash_operator", + "--from-date", + DEFAULT_DATE.isoformat(), + "--to-date", + DEFAULT_DATE.isoformat(), + ] + ) + ) + + mock_create.assert_called_once_with( + dag_id="example_bash_operator", + from_date=DEFAULT_DATE, + to_date=DEFAULT_DATE, + max_active_runs=None, + reverse=False, + dag_run_conf=None, + ) diff --git a/tests/cli/commands/test_dag_command.py b/tests/cli/commands/test_dag_command.py index f0c9a18c1a56..e9d10a2e33c1 100644 --- a/tests/cli/commands/test_dag_command.py +++ b/tests/cli/commands/test_dag_command.py @@ -124,139 +124,6 @@ def test_reserialize_should_support_subdir_argument(self): serialized_dags_after_reserialize = session.query(SerializedDagModel).all() assert len(serialized_dags_after_reserialize) == 1 # Serialized DAG back - @mock.patch("airflow.cli.commands.dag_command.DAG.run") - def test_backfill(self, mock_run): - dag_command.dag_backfill( - self.parser.parse_args( - ["dags", "backfill", "example_bash_operator", "--start-date", DEFAULT_DATE.isoformat()] - ) - ) - - mock_run.assert_called_once_with( - start_date=DEFAULT_DATE, - end_date=DEFAULT_DATE, - conf=None, - delay_on_limit_secs=1.0, - donot_pickle=False, - ignore_first_depends_on_past=True, - ignore_task_deps=False, - local=False, - mark_success=False, - pool=None, - rerun_failed_tasks=False, - run_backwards=False, - verbose=False, - continue_on_failures=False, - disable_retry=False, - ) - mock_run.reset_mock() - dag = self.dagbag.get_dag("example_bash_operator") - - with contextlib.redirect_stdout(StringIO()) as stdout: - dag_command.dag_backfill( - self.parser.parse_args( - [ - "dags", - "backfill", - "example_bash_operator", - "--task-regex", - "runme_0", - "--dry-run", - "--start-date", - DEFAULT_DATE.isoformat(), - ] - ), - dag=dag, - ) - - output = stdout.getvalue() - assert f"Dry run of DAG example_bash_operator on {DEFAULT_DATE_REPR}\n" in output - assert "Task runme_0 located in DAG example_bash_operator\n" in output - - mock_run.assert_not_called() # Dry run shouldn't run the backfill - - dag_command.dag_backfill( - self.parser.parse_args( - [ - "dags", - "backfill", - "example_bash_operator", - "--dry-run", - "--start-date", - DEFAULT_DATE.isoformat(), - ] - ), - dag=dag, - ) - - mock_run.assert_not_called() # Dry run shouldn't run the backfill - - dag_command.dag_backfill( - self.parser.parse_args( - [ - "dags", - "backfill", - "example_bash_operator", - "--local", - "--start-date", - DEFAULT_DATE.isoformat(), - ] - ), - dag=dag, - ) - - mock_run.assert_called_once_with( - start_date=DEFAULT_DATE, - end_date=DEFAULT_DATE, - conf=None, - delay_on_limit_secs=1.0, - donot_pickle=False, - ignore_first_depends_on_past=True, - ignore_task_deps=False, - local=True, - mark_success=False, - pool=None, - rerun_failed_tasks=False, - run_backwards=False, - verbose=False, - continue_on_failures=False, - disable_retry=False, - ) - mock_run.reset_mock() - - with contextlib.redirect_stdout(StringIO()) as stdout: - dag_command.dag_backfill( - self.parser.parse_args( - [ - "dags", - "backfill", - "example_branch_(python_){0,1}operator(_decorator){0,1}", - "--task-regex", - "run_this_first", - "--dry-run", - "--treat-dag-id-as-regex", - "--start-date", - DEFAULT_DATE.isoformat(), - ] - ), - ) - - output = stdout.getvalue() - - assert f"Dry run of DAG example_branch_python_operator_decorator on {DEFAULT_DATE_REPR}\n" in output - assert "Task run_this_first located in DAG example_branch_python_operator_decorator\n" in output - assert f"Dry run of DAG example_branch_operator on {DEFAULT_DATE_REPR}\n" in output - assert "Task run_this_first located in DAG example_branch_operator\n" in output - - @mock.patch("airflow.cli.commands.dag_command.get_dag") - def test_backfill_fails_without_loading_dags(self, mock_get_dag): - cli_args = self.parser.parse_args(["dags", "backfill", "example_bash_operator"]) - - with pytest.raises(AirflowException): - dag_command.dag_backfill(cli_args) - - mock_get_dag.assert_not_called() - def test_show_dag_dependencies_print(self): with contextlib.redirect_stdout(StringIO()) as temp_stdout: dag_command.dag_dependencies_show(self.parser.parse_args(["dags", "show-dependencies"])) @@ -314,124 +181,6 @@ def test_show_dag_imgcat(self, mock_render_dag, mock_popen): assert "OUT" in out assert "ERR" in out - @mock.patch("airflow.cli.commands.dag_command.DAG.run") - def test_cli_backfill_ignore_first_depends_on_past(self, mock_run): - """ - Test that CLI respects -I argument - - We just check we call dag.run() right. The behaviour of that kwarg is - tested in test_jobs - """ - dag_id = "example_bash_operator" - run_date = DEFAULT_DATE + timedelta(days=1) - args = [ - "dags", - "backfill", - dag_id, - "--local", - "--start-date", - run_date.isoformat(), - ] - dag = self.dagbag.get_dag(dag_id) - - dag_command.dag_backfill(self.parser.parse_args(args), dag=dag) - - mock_run.assert_called_once_with( - start_date=run_date, - end_date=run_date, - conf=None, - delay_on_limit_secs=1.0, - donot_pickle=False, - ignore_first_depends_on_past=True, - ignore_task_deps=False, - local=True, - mark_success=False, - pool=None, - rerun_failed_tasks=False, - run_backwards=False, - verbose=False, - continue_on_failures=False, - disable_retry=False, - ) - - @pytest.mark.parametrize( - "cli_arg", - [ - pytest.param("-B", id="short"), - pytest.param("--run-backwards", id="full"), - ], - ) - @mock.patch("airflow.cli.commands.dag_command.DAG.run") - def test_cli_backfill_depends_on_past_run_backwards(self, mock_run, cli_arg: str): - """Test that CLI respects -B argument.""" - dag_id = "test_depends_on_past" - start_date = DEFAULT_DATE + timedelta(days=1) - end_date = start_date + timedelta(days=1) - args = [ - "dags", - "backfill", - dag_id, - "--local", - "--start-date", - start_date.isoformat(), - "--end-date", - end_date.isoformat(), - cli_arg, - ] - dag = self.dagbag.get_dag(dag_id) - - dag_command.dag_backfill(self.parser.parse_args(args), dag=dag) - mock_run.assert_called_once_with( - start_date=start_date, - end_date=end_date, - conf=None, - delay_on_limit_secs=1.0, - donot_pickle=False, - ignore_first_depends_on_past=True, - ignore_task_deps=False, - local=True, - mark_success=False, - pool=None, - rerun_failed_tasks=False, - run_backwards=True, - verbose=False, - continue_on_failures=False, - disable_retry=False, - ) - - @mock.patch("airflow.models.taskinstance.TaskInstance.dry_run") - @mock.patch("airflow.cli.commands.dag_command.DagRun") - def test_backfill_with_custom_timetable(self, mock_dagrun, mock_dry_run): - """ - when calling `dags backfill` on dag with custom timetable, the DagRun object should be created with - data_intervals. - """ - - start_date = DEFAULT_DATE + timedelta(days=1) - end_date = start_date + timedelta(days=1) - workdays = [ - start_date, - start_date + timedelta(days=1), - start_date + timedelta(days=2), - ] - cli_args = self.parser.parse_args( - [ - "dags", - "backfill", - "example_workday_timetable", - "--start-date", - start_date.isoformat(), - "--end-date", - end_date.isoformat(), - "--dry-run", - ] - ) - from airflow.example_dags.plugins.workday import AfterWorkdayTimetable - - with mock.patch.object(AfterWorkdayTimetable, "get_next_workday", side_effect=workdays): - dag_command.dag_backfill(cli_args) - assert "data_interval" in mock_dagrun.call_args.kwargs - def test_next_execution(self, tmp_path): dag_test_list = [ ("future_schedule_daily", "timedelta(days=5)", "'0 0 * * *'", "True"), diff --git a/tests/jobs/test_backfill_job.py b/tests/jobs/test_backfill_job.py index dead9be86230..616f328ee417 100644 --- a/tests/jobs/test_backfill_job.py +++ b/tests/jobs/test_backfill_job.py @@ -29,7 +29,6 @@ import pytest from airflow import settings -from airflow.cli import cli_parser from airflow.exceptions import ( AirflowException, BackfillUnfinished, @@ -186,7 +185,6 @@ def clean_db(): @pytest.fixture(autouse=True) def set_instance_attrs(self, dag_bag): self.clean_db() - self.parser = cli_parser.get_parser() self.dagbag = dag_bag # `airflow tasks run` relies on serialized_dag for dag in self.dagbag.dags.values(): @@ -1126,24 +1124,6 @@ def test_backfill_depends_on_past_backwards(self, mock_executor): with pytest.raises(AirflowException, match=expected_msg): run_job(job=job, execute_callable=job_runner._execute) - def test_cli_receives_delay_arg(self): - """ - Tests that the --delay argument is passed correctly to the BackfillJob - """ - dag_id = "example_bash_operator" - run_date = DEFAULT_DATE - args = [ - "dags", - "backfill", - dag_id, - "-s", - run_date.isoformat(), - "--delay-on-limit", - "0.5", - ] - parsed_args = self.parser.parse_args(args) - assert 0.5 == parsed_args.delay_on_limit - def _get_dag_test_max_active_limits( self, dag_maker_fixture, dag_id="test_dag", max_active_runs=1, **kwargs ):