Skip to content

Commit

Permalink
Add a "backfill create" command (apache#42922)
Browse files Browse the repository at this point in the history
I create a new command group "backfill" for management of backfills. The first action is "create" which creates a backfill. Some others may follow such as pause / cancel.
  • Loading branch information
dstandish authored Oct 11, 2024
1 parent aab1e37 commit b9b24f4
Show file tree
Hide file tree
Showing 8 changed files with 176 additions and 474 deletions.
137 changes: 40 additions & 97 deletions airflow/cli/cli_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ def string_lower_type(val):
help="The number of next execution datetimes to show",
)

# backfill
# misc
ARG_MARK_SUCCESS = Arg(
("-m", "--mark-success"), help="Mark jobs as succeeded without running them", action="store_true"
)
Expand Down Expand Up @@ -300,78 +300,34 @@ def string_lower_type(val):
)
ARG_VERBOSE = Arg(("-v", "--verbose"), help="Make logging output more verbose", action="store_true")
ARG_LOCAL = Arg(("-l", "--local"), help="Run the task using the LocalExecutor", action="store_true")
ARG_DONOT_PICKLE = Arg(
("-x", "--donot-pickle"),
help=(
"Do not attempt to pickle the DAG object to send over "
"to the workers, just tell the workers to run their version "
"of the code"
),
action="store_true",
)
ARG_BF_IGNORE_DEPENDENCIES = Arg(
("-i", "--ignore-dependencies"),
help=(
"Skip upstream tasks, run only the tasks "
"matching the regexp. Only works in conjunction "
"with task_regex"
),
action="store_true",
)
ARG_POOL = Arg(("--pool",), "Resource pool to use")
ARG_DELAY_ON_LIMIT = Arg(
("--delay-on-limit",),
help=(
"Amount of time in seconds to wait when the limit "
"on maximum active dag runs (max_active_runs) has "
"been reached before trying to execute a dag run "
"again"
),
type=float,
default=1.0,
)
ARG_RESET_DAG_RUN = Arg(
("--reset-dagruns",),
help=(
"if set, the backfill will delete existing "
"backfill-related DAG runs and start "
"anew with fresh, running DAG runs"
),
action="store_true",
)
ARG_RERUN_FAILED_TASKS = Arg(
("--rerun-failed-tasks",),
help=(
"if set, the backfill will auto-rerun "
"all the failed tasks for the backfill date range "
"instead of throwing exceptions"
),
action="store_true",
)
ARG_CONTINUE_ON_FAILURES = Arg(
("--continue-on-failures",),
help=("if set, the backfill will keep going even if some of the tasks failed"),
action="store_true",


# backfill
ARG_BACKFILL_DAG = Arg(flags=("--dag",), help="The dag to backfill.", required=True)
ARG_BACKFILL_FROM_DATE = Arg(
("--from-date",), help="Earliest logical date to backfill.", type=parsedate, required=True
)
ARG_DISABLE_RETRY = Arg(
("--disable-retry",),
help=("if set, the backfill will set tasks as failed without retrying."),
action="store_true",
ARG_BACKFILL_TO_DATE = Arg(
("--to-date",), help="Latest logical date to backfill", type=parsedate, required=True
)
ARG_DAG_RUN_CONF = Arg(flags=("--dag-run-conf",), help="JSON dag run configuration.")
ARG_RUN_BACKWARDS = Arg(
(
"-B",
"--run-backwards",
),
flags=("--run-backwards",),
help=(
"if set, the backfill will run tasks from the most "
"recent day first. if there are tasks that depend_on_past "
"this option will throw an exception"
"If set, the backfill will run tasks from the most recent logical date first. "
"Not supported if there are tasks that depend_on_past."
),
action="store_true",
)
ARG_MAX_ACTIVE_RUNS = Arg(
("--max-active-runs",),
type=positive_int(allow_zero=False),
help="Max active runs for this backfill.",
)


# misc
ARG_TREAT_DAG_ID_AS_REGEX = Arg(
("--treat-dag-id-as-regex",),
help=("if set, dag_id will be treated as regex instead of an exact string"),
Expand Down Expand Up @@ -1056,6 +1012,22 @@ class GroupCommand(NamedTuple):

CLICommand = Union[ActionCommand, GroupCommand]

BACKFILL_COMMANDS = (
ActionCommand(
name="create",
help="Create a backfill for a dag.",
description="Run subsections of a DAG for a specified date range.",
func=lazy_load_command("airflow.cli.commands.backfill_command.create_backfill"),
args=(
ARG_BACKFILL_DAG,
ARG_BACKFILL_FROM_DATE,
ARG_BACKFILL_TO_DATE,
ARG_DAG_RUN_CONF,
ARG_RUN_BACKWARDS,
ARG_MAX_ACTIVE_RUNS,
),
),
)
DAGS_COMMANDS = (
ActionCommand(
name="details",
Expand Down Expand Up @@ -1227,40 +1199,6 @@ class GroupCommand(NamedTuple):
ARG_VERBOSE,
),
),
ActionCommand(
name="backfill",
help="Run subsections of a DAG for a specified date range",
description=(
"Run subsections of a DAG for a specified date range. If reset_dag_run option is used, "
"backfill will first prompt users whether airflow should clear all the previous dag_run and "
"task_instances within the backfill date range. If rerun_failed_tasks is used, backfill "
"will auto re-run the previous failed task instances within the backfill date range"
),
func=lazy_load_command("airflow.cli.commands.dag_command.dag_backfill"),
args=(
ARG_DAG_ID,
ARG_TASK_REGEX,
ARG_START_DATE,
ARG_END_DATE,
ARG_MARK_SUCCESS,
ARG_LOCAL,
ARG_DONOT_PICKLE,
ARG_YES,
ARG_CONTINUE_ON_FAILURES,
ARG_DISABLE_RETRY,
ARG_BF_IGNORE_DEPENDENCIES,
ARG_SUBDIR,
ARG_POOL,
ARG_DELAY_ON_LIMIT,
ARG_DRY_RUN,
ARG_VERBOSE,
ARG_CONF,
ARG_RESET_DAG_RUN,
ARG_RERUN_FAILED_TASKS,
ARG_RUN_BACKWARDS,
ARG_TREAT_DAG_ID_AS_REGEX,
),
),
ActionCommand(
name="test",
help="Execute one single DagRun",
Expand Down Expand Up @@ -1913,6 +1851,11 @@ class GroupCommand(NamedTuple):
help="Manage DAGs",
subcommands=DAGS_COMMANDS,
),
GroupCommand(
name="backfill",
help="Manage backfills",
subcommands=BACKFILL_COMMANDS,
),
GroupCommand(
name="tasks",
help="Manage tasks",
Expand Down
44 changes: 44 additions & 0 deletions airflow/cli/commands/backfill_command.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

import logging
import signal

from airflow import settings
from airflow.models.backfill import _create_backfill
from airflow.utils import cli as cli_utils
from airflow.utils.cli import sigint_handler
from airflow.utils.providers_configuration_loader import providers_configuration_loaded


@cli_utils.action_cli
@providers_configuration_loaded
def create_backfill(args) -> None:
"""Create backfill job or dry run for a DAG or list of DAGs using regex."""
logging.basicConfig(level=settings.LOGGING_LEVEL, format=settings.SIMPLE_LOG_FORMAT)
signal.signal(signal.SIGTERM, sigint_handler)

_create_backfill(
dag_id=args.dag,
from_date=args.from_date,
to_date=args.to_date,
max_active_runs=args.max_active_runs,
reverse=args.run_backwards,
dag_run_conf=args.dag_run_conf,
)
102 changes: 2 additions & 100 deletions airflow/cli/commands/dag_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,136 +23,38 @@
import json
import logging
import operator
import signal
import subprocess
import sys
from typing import TYPE_CHECKING

import re2
from sqlalchemy import delete, select

from airflow import settings
from airflow.api.client import get_current_api_client
from airflow.api_connexion.schemas.dag_schema import dag_schema
from airflow.cli.simple_table import AirflowConsole
from airflow.configuration import conf
from airflow.exceptions import AirflowException
from airflow.jobs.job import Job
from airflow.models import DagBag, DagModel, DagRun, TaskInstance
from airflow.models.dag import DAG
from airflow.models.serialized_dag import SerializedDagModel
from airflow.utils import cli as cli_utils, timezone
from airflow.utils.cli import get_dag, get_dags, process_subdir, sigint_handler, suppress_logs_and_warning
from airflow.utils.cli import get_dag, process_subdir, suppress_logs_and_warning
from airflow.utils.dag_parsing_context import _airflow_parsing_context_manager
from airflow.utils.dot_renderer import render_dag, render_dag_dependencies
from airflow.utils.helpers import ask_yesno
from airflow.utils.providers_configuration_loader import providers_configuration_loaded
from airflow.utils.session import NEW_SESSION, create_session, provide_session
from airflow.utils.state import DagRunState
from airflow.utils.types import DagRunTriggeredByType

if TYPE_CHECKING:
from graphviz.dot import Dot
from sqlalchemy.orm import Session

from airflow.models.dag import DAG
from airflow.timetables.base import DataInterval

log = logging.getLogger(__name__)


def _run_dag_backfill(dags: list[DAG], args) -> None:
# If only one date is passed, using same as start and end
args.end_date = args.end_date or args.start_date
args.start_date = args.start_date or args.end_date

run_conf = None
if args.conf:
run_conf = json.loads(args.conf)

for dag in dags:
if args.task_regex:
dag = dag.partial_subset(
task_ids_or_regex=args.task_regex, include_upstream=not args.ignore_dependencies
)
if not dag.task_dict:
raise AirflowException(
f"There are no tasks that match '{args.task_regex}' regex. Nothing to run, exiting..."
)

if args.dry_run:
print(f"Dry run of DAG {dag.dag_id} on {args.start_date}")
dagrun_infos = dag.iter_dagrun_infos_between(earliest=args.start_date, latest=args.end_date)
for dagrun_info in dagrun_infos:
dr = DagRun(
dag.dag_id,
execution_date=dagrun_info.logical_date,
data_interval=dagrun_info.data_interval,
triggered_by=DagRunTriggeredByType.CLI,
)

for task in dag.tasks:
print(f"Task {task.task_id} located in DAG {dag.dag_id}")
ti = TaskInstance(task, run_id=None)
ti.dag_run = dr
ti.dry_run()
else:
if args.reset_dagruns:
DAG.clear_dags(
[dag],
start_date=args.start_date,
end_date=args.end_date,
confirm_prompt=not args.yes,
dag_run_state=DagRunState.QUEUED,
)

try:
dag.run(
start_date=args.start_date,
end_date=args.end_date,
mark_success=args.mark_success,
local=args.local,
donot_pickle=(args.donot_pickle or conf.getboolean("core", "donot_pickle")),
ignore_first_depends_on_past=args.ignore_first_depends_on_past,
ignore_task_deps=args.ignore_dependencies,
pool=args.pool,
delay_on_limit_secs=args.delay_on_limit,
verbose=args.verbose,
conf=run_conf,
rerun_failed_tasks=args.rerun_failed_tasks,
run_backwards=args.run_backwards,
continue_on_failures=args.continue_on_failures,
disable_retry=args.disable_retry,
)
except ValueError as vr:
print(str(vr))
sys.exit(1)


@cli_utils.action_cli
@providers_configuration_loaded
def dag_backfill(args, dag: list[DAG] | DAG | None = None) -> None:
"""Create backfill job or dry run for a DAG or list of DAGs using regex."""
logging.basicConfig(level=settings.LOGGING_LEVEL, format=settings.SIMPLE_LOG_FORMAT)
signal.signal(signal.SIGTERM, sigint_handler)
args.ignore_first_depends_on_past = True

if not args.start_date and not args.end_date:
raise AirflowException("Provide a start_date and/or end_date")

if not dag:
dags = get_dags(args.subdir, dag_id=args.dag_id, use_regex=args.treat_dag_id_as_regex)
elif isinstance(dag, list):
dags = dag
else:
dags = [dag]
del dag

dags.sort(key=lambda d: d.dag_id)
_run_dag_backfill(dags, args)
if len(dags) > 1:
log.info("All of the backfills are done.")


@cli_utils.action_cli
@providers_configuration_loaded
def dag_trigger(args) -> None:
Expand Down
4 changes: 1 addition & 3 deletions airflow/executors/executor_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,10 +171,8 @@ def set_default_executor(cls, executor: BaseExecutor) -> None:
"""
Externally set an executor to be the default.
This is used in rare cases such as dag.run which allows, as a user convenience, to provide
This is used in rare cases such as dag.test which allows, as a user convenience, to provide
the executor by cli/argument instead of Airflow configuration
todo: given comments above, is this needed anymore since DAG.run is removed?
"""
exec_class_name = executor.__class__.__qualname__
exec_name = ExecutorName(f"{executor.__module__}.{exec_class_name}")
Expand Down
3 changes: 0 additions & 3 deletions airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -2293,9 +2293,6 @@ def _remove_task(self, task_id: str) -> None:

self.task_count = len(self.task_dict)

def run(self, *args, **kwargs):
"""Leaving this here to be removed in other PR for simpler review."""

def cli(self):
"""Exposes a CLI specific to this DAG."""
check_cycle(self)
Expand Down
Loading

0 comments on commit b9b24f4

Please sign in to comment.