Skip to content

Commit

Permalink
Move TaskInstance heartbeat directly on to TI row, not on Job row (ap…
Browse files Browse the repository at this point in the history
…ache#43599)

This is part of the work for AIP-72 epic, but is done as a separate PR for
ease of review.

This PR by itself doesn't remove the LocalTaskJob row (that will happen in a
future PR when the execution code is moved over to live in the TaskSDK) but
this paves the way for it. The reason we are making this change is:

- Having a separate row for tracking TI heartbeat is not really buying us much
- With the addition of TaskInstanceHistory we don't need _another_ separate
  record of when/where TIs were run
- It simplifies things (one less join in finding zombies)
- Makes zombie tracking easier -- it is now just on the TI state, not the
  combination of TI and Job state.
  • Loading branch information
ashb authored Nov 3, 2024
1 parent ddc5670 commit 6fd7052
Show file tree
Hide file tree
Showing 26 changed files with 294 additions and 280 deletions.
2 changes: 0 additions & 2 deletions airflow/cli/cli_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -580,7 +580,6 @@ def string_lower_type(val):
("--ship-dag",), help="Pickles (serializes) the DAG and ships it to the worker", action="store_true"
)
ARG_PICKLE = Arg(("-p", "--pickle"), help="Serialized pickle object of the entire dag (used internally)")
ARG_JOB_ID = Arg(("-j", "--job-id"), help=argparse.SUPPRESS)
ARG_CFG_PATH = Arg(("--cfg-path",), help="Path to config file to use instead of airflow.cfg")
ARG_MAP_INDEX = Arg(("--map-index",), type=int, default=-1, help="Mapped task index")
ARG_READ_FROM_DB = Arg(("--read-from-db",), help="Read dag from DB instead of dag file", action="store_true")
Expand Down Expand Up @@ -1354,7 +1353,6 @@ class GroupCommand(NamedTuple):
ARG_DEPENDS_ON_PAST,
ARG_SHIP_DAG,
ARG_PICKLE,
ARG_JOB_ID,
ARG_INTERACTIVE,
ARG_SHUT_DOWN_LOGGING,
ARG_MAP_INDEX,
Expand Down
1 change: 0 additions & 1 deletion airflow/cli/commands/task_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,6 @@ def _run_raw_task(args, ti: TaskInstance) -> None | TaskReturnCode:
"""Run the main task handling code."""
return ti._run_raw_task(
mark_success=args.mark_success,
job_id=args.job_id,
pool=args.pool,
)

Expand Down
2 changes: 1 addition & 1 deletion airflow/executors/debug_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def _run_task(self, ti: TaskInstance) -> bool:
key = ti.key
try:
params = self.tasks_params.pop(ti.key, {})
ti.run(job_id=ti.job_id, **params)
ti.run(**params)
self.success(key)
return True
except Exception as e:
Expand Down
3 changes: 2 additions & 1 deletion airflow/jobs/local_task_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,6 @@ def sigusr2_debug_handler(signum, frame):
wait_for_past_depends_before_skipping=self.wait_for_past_depends_before_skipping,
ignore_task_deps=self.ignore_task_deps,
ignore_ti_state=self.ignore_ti_state,
job_id=str(self.job.id),
pool=self.pool,
external_executor_id=self.external_executor_id,
):
Expand Down Expand Up @@ -319,6 +318,8 @@ def heartbeat_callback(self, session: Session = NEW_SESSION) -> None:
"Recorded pid %s does not match the current pid %s", recorded_pid, current_pid
)
raise AirflowException("PID of job runner does not match")
ti.update_heartbeat()

elif self.task_runner.return_code() is None and hasattr(self.task_runner, "process"):
self._overtime = (timezone.utcnow() - (ti.end_date or timezone.utcnow())).total_seconds()
if ti.state == TaskInstanceState.SKIPPED:
Expand Down
18 changes: 7 additions & 11 deletions airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from pathlib import Path
from typing import TYPE_CHECKING, Any, Callable, Collection, Iterable, Iterator

from sqlalchemy import and_, delete, exists, func, not_, or_, select, text, update
from sqlalchemy import and_, delete, exists, func, not_, select, text, update
from sqlalchemy.exc import OperationalError
from sqlalchemy.orm import lazyload, load_only, make_transient, selectinload
from sqlalchemy.sql import expression
Expand Down Expand Up @@ -777,7 +777,7 @@ def process_executor_events(
"TaskInstance Finished: dag_id=%s, task_id=%s, run_id=%s, map_index=%s, "
"run_start_date=%s, run_end_date=%s, "
"run_duration=%s, state=%s, executor=%s, executor_state=%s, try_number=%s, max_tries=%s, "
"job_id=%s, pool=%s, queue=%s, priority_weight=%d, operator=%s, queued_dttm=%s, "
"pool=%s, queue=%s, priority_weight=%d, operator=%s, queued_dttm=%s, "
"queued_by_job_id=%s, pid=%s"
)
cls.logger().info(
Expand All @@ -794,7 +794,6 @@ def process_executor_events(
state,
try_number,
ti.max_tries,
ti.job_id,
ti.pool,
ti.queue,
ti.priority_weight,
Expand All @@ -821,7 +820,6 @@ def process_executor_events(
span.set_attribute("operator", str(ti.operator))
span.set_attribute("try_number", ti.try_number)
span.set_attribute("executor_state", state)
span.set_attribute("job_id", ti.job_id)
span.set_attribute("pool", ti.pool)
span.set_attribute("queue", ti.queue)
span.set_attribute("priority_weight", ti.priority_weight)
Expand Down Expand Up @@ -1977,22 +1975,20 @@ def _find_and_purge_zombies(self) -> None:
self._purge_zombies(zombies, session=session)

def _find_zombies(self, *, session: Session) -> list[tuple[TI, str, str]]:
from airflow.jobs.job import Job

self.log.debug("Finding 'running' jobs without a recent heartbeat")
limit_dttm = timezone.utcnow() - timedelta(seconds=self._zombie_threshold_secs)
zombies = session.execute(
select(TI, DM.fileloc, DM.processor_subdir)
.with_hint(TI, "USE INDEX (ti_state)", dialect_name="mysql")
.join(Job, TI.job_id == Job.id)
.join(DM, TI.dag_id == DM.dag_id)
.where(TI.state == TaskInstanceState.RUNNING)
.where(or_(Job.state != JobState.RUNNING, Job.latest_heartbeat < limit_dttm))
.where(Job.job_type == "LocalTaskJob")
.where(
TI.state.in_((TaskInstanceState.RUNNING, TaskInstanceState.RESTARTING)),
TI.last_heartbeat_at < limit_dttm,
)
.where(TI.queued_by_job_id == self.job.id)
).all()
if zombies:
self.log.warning("Failing (%s) jobs without heartbeat after %s", len(zombies), limit_dttm)
self.log.warning("Failing %s TIs without heartbeat after %s", len(zombies), limit_dttm)
return zombies

def _purge_zombies(self, zombies: list[tuple[TI, str, str]], *, session: Session) -> None:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""
Add last_heartbeat_at directly to TI.
Revision ID: d8cd3297971e
Revises: 5f57a45b8433
Create Date: 2024-11-01 12:14:59.927266
"""

from __future__ import annotations

import sqlalchemy as sa
from alembic import op

from airflow.migrations.db_types import TIMESTAMP

# revision identifiers, used by Alembic.
revision = "d8cd3297971e"
down_revision = "5f57a45b8433"
branch_labels = None
depends_on = None
airflow_version = "3.0.0"


def upgrade():
with op.batch_alter_table("task_instance", schema=None) as batch_op:
batch_op.add_column(sa.Column("last_heartbeat_at", TIMESTAMP(timezone=True), nullable=True))
batch_op.drop_index("ti_job_id")
batch_op.create_index("ti_heartbeat", ["last_heartbeat_at"], unique=False)
batch_op.drop_column("job_id")
with op.batch_alter_table("task_instance_history", schema=None) as batch_op:
batch_op.drop_column("job_id")


def downgrade():
with op.batch_alter_table("task_instance", schema=None) as batch_op:
batch_op.add_column(sa.Column("job_id", sa.INTEGER(), autoincrement=False, nullable=True))
batch_op.drop_index("ti_heartbeat")
batch_op.create_index("ti_job_id", ["job_id"], unique=False)
batch_op.drop_column("last_heartbeat_at")
with op.batch_alter_table("task_instance_history", schema=None) as batch_op:
batch_op.add_column(sa.Column("job_id", sa.INTEGER(), autoincrement=False, nullable=True))
Loading

0 comments on commit 6fd7052

Please sign in to comment.