diff --git a/airflow/cli/cli_config.py b/airflow/cli/cli_config.py
index 15543023cbf6..06ac2f7bd817 100644
--- a/airflow/cli/cli_config.py
+++ b/airflow/cli/cli_config.py
@@ -580,7 +580,6 @@ def string_lower_type(val):
("--ship-dag",), help="Pickles (serializes) the DAG and ships it to the worker", action="store_true"
)
ARG_PICKLE = Arg(("-p", "--pickle"), help="Serialized pickle object of the entire dag (used internally)")
-ARG_JOB_ID = Arg(("-j", "--job-id"), help=argparse.SUPPRESS)
ARG_CFG_PATH = Arg(("--cfg-path",), help="Path to config file to use instead of airflow.cfg")
ARG_MAP_INDEX = Arg(("--map-index",), type=int, default=-1, help="Mapped task index")
ARG_READ_FROM_DB = Arg(("--read-from-db",), help="Read dag from DB instead of dag file", action="store_true")
@@ -1354,7 +1353,6 @@ class GroupCommand(NamedTuple):
ARG_DEPENDS_ON_PAST,
ARG_SHIP_DAG,
ARG_PICKLE,
- ARG_JOB_ID,
ARG_INTERACTIVE,
ARG_SHUT_DOWN_LOGGING,
ARG_MAP_INDEX,
diff --git a/airflow/cli/commands/task_command.py b/airflow/cli/commands/task_command.py
index 23f6e1abbe5e..03d2737072f3 100644
--- a/airflow/cli/commands/task_command.py
+++ b/airflow/cli/commands/task_command.py
@@ -341,7 +341,6 @@ def _run_raw_task(args, ti: TaskInstance) -> None | TaskReturnCode:
"""Run the main task handling code."""
return ti._run_raw_task(
mark_success=args.mark_success,
- job_id=args.job_id,
pool=args.pool,
)
diff --git a/airflow/executors/debug_executor.py b/airflow/executors/debug_executor.py
index 80fb673cab84..aead7e2b2c11 100644
--- a/airflow/executors/debug_executor.py
+++ b/airflow/executors/debug_executor.py
@@ -84,7 +84,7 @@ def _run_task(self, ti: TaskInstance) -> bool:
key = ti.key
try:
params = self.tasks_params.pop(ti.key, {})
- ti.run(job_id=ti.job_id, **params)
+ ti.run(**params)
self.success(key)
return True
except Exception as e:
diff --git a/airflow/jobs/local_task_job_runner.py b/airflow/jobs/local_task_job_runner.py
index a33005b0a52c..c900c88674e7 100644
--- a/airflow/jobs/local_task_job_runner.py
+++ b/airflow/jobs/local_task_job_runner.py
@@ -159,7 +159,6 @@ def sigusr2_debug_handler(signum, frame):
wait_for_past_depends_before_skipping=self.wait_for_past_depends_before_skipping,
ignore_task_deps=self.ignore_task_deps,
ignore_ti_state=self.ignore_ti_state,
- job_id=str(self.job.id),
pool=self.pool,
external_executor_id=self.external_executor_id,
):
@@ -319,6 +318,8 @@ def heartbeat_callback(self, session: Session = NEW_SESSION) -> None:
"Recorded pid %s does not match the current pid %s", recorded_pid, current_pid
)
raise AirflowException("PID of job runner does not match")
+ ti.update_heartbeat()
+
elif self.task_runner.return_code() is None and hasattr(self.task_runner, "process"):
self._overtime = (timezone.utcnow() - (ti.end_date or timezone.utcnow())).total_seconds()
if ti.state == TaskInstanceState.SKIPPED:
diff --git a/airflow/jobs/scheduler_job_runner.py b/airflow/jobs/scheduler_job_runner.py
index b763011e5500..39e4e35087bc 100644
--- a/airflow/jobs/scheduler_job_runner.py
+++ b/airflow/jobs/scheduler_job_runner.py
@@ -30,7 +30,7 @@
from pathlib import Path
from typing import TYPE_CHECKING, Any, Callable, Collection, Iterable, Iterator
-from sqlalchemy import and_, delete, exists, func, not_, or_, select, text, update
+from sqlalchemy import and_, delete, exists, func, not_, select, text, update
from sqlalchemy.exc import OperationalError
from sqlalchemy.orm import lazyload, load_only, make_transient, selectinload
from sqlalchemy.sql import expression
@@ -777,7 +777,7 @@ def process_executor_events(
"TaskInstance Finished: dag_id=%s, task_id=%s, run_id=%s, map_index=%s, "
"run_start_date=%s, run_end_date=%s, "
"run_duration=%s, state=%s, executor=%s, executor_state=%s, try_number=%s, max_tries=%s, "
- "job_id=%s, pool=%s, queue=%s, priority_weight=%d, operator=%s, queued_dttm=%s, "
+ "pool=%s, queue=%s, priority_weight=%d, operator=%s, queued_dttm=%s, "
"queued_by_job_id=%s, pid=%s"
)
cls.logger().info(
@@ -794,7 +794,6 @@ def process_executor_events(
state,
try_number,
ti.max_tries,
- ti.job_id,
ti.pool,
ti.queue,
ti.priority_weight,
@@ -821,7 +820,6 @@ def process_executor_events(
span.set_attribute("operator", str(ti.operator))
span.set_attribute("try_number", ti.try_number)
span.set_attribute("executor_state", state)
- span.set_attribute("job_id", ti.job_id)
span.set_attribute("pool", ti.pool)
span.set_attribute("queue", ti.queue)
span.set_attribute("priority_weight", ti.priority_weight)
@@ -1977,22 +1975,20 @@ def _find_and_purge_zombies(self) -> None:
self._purge_zombies(zombies, session=session)
def _find_zombies(self, *, session: Session) -> list[tuple[TI, str, str]]:
- from airflow.jobs.job import Job
-
self.log.debug("Finding 'running' jobs without a recent heartbeat")
limit_dttm = timezone.utcnow() - timedelta(seconds=self._zombie_threshold_secs)
zombies = session.execute(
select(TI, DM.fileloc, DM.processor_subdir)
.with_hint(TI, "USE INDEX (ti_state)", dialect_name="mysql")
- .join(Job, TI.job_id == Job.id)
.join(DM, TI.dag_id == DM.dag_id)
- .where(TI.state == TaskInstanceState.RUNNING)
- .where(or_(Job.state != JobState.RUNNING, Job.latest_heartbeat < limit_dttm))
- .where(Job.job_type == "LocalTaskJob")
+ .where(
+ TI.state.in_((TaskInstanceState.RUNNING, TaskInstanceState.RESTARTING)),
+ TI.last_heartbeat_at < limit_dttm,
+ )
.where(TI.queued_by_job_id == self.job.id)
).all()
if zombies:
- self.log.warning("Failing (%s) jobs without heartbeat after %s", len(zombies), limit_dttm)
+ self.log.warning("Failing %s TIs without heartbeat after %s", len(zombies), limit_dttm)
return zombies
def _purge_zombies(self, zombies: list[tuple[TI, str, str]], *, session: Session) -> None:
diff --git a/airflow/migrations/versions/0045_3_0_0_add_last_heartbeat_at_directly_to_ti.py b/airflow/migrations/versions/0045_3_0_0_add_last_heartbeat_at_directly_to_ti.py
new file mode 100644
index 000000000000..47e72de9dcb4
--- /dev/null
+++ b/airflow/migrations/versions/0045_3_0_0_add_last_heartbeat_at_directly_to_ti.py
@@ -0,0 +1,60 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Add last_heartbeat_at directly to TI.
+
+Revision ID: d8cd3297971e
+Revises: 5f57a45b8433
+Create Date: 2024-11-01 12:14:59.927266
+
+"""
+
+from __future__ import annotations
+
+import sqlalchemy as sa
+from alembic import op
+
+from airflow.migrations.db_types import TIMESTAMP
+
+# revision identifiers, used by Alembic.
+revision = "d8cd3297971e"
+down_revision = "5f57a45b8433"
+branch_labels = None
+depends_on = None
+airflow_version = "3.0.0"
+
+
+def upgrade():
+ with op.batch_alter_table("task_instance", schema=None) as batch_op:
+ batch_op.add_column(sa.Column("last_heartbeat_at", TIMESTAMP(timezone=True), nullable=True))
+ batch_op.drop_index("ti_job_id")
+ batch_op.create_index("ti_heartbeat", ["last_heartbeat_at"], unique=False)
+ batch_op.drop_column("job_id")
+ with op.batch_alter_table("task_instance_history", schema=None) as batch_op:
+ batch_op.drop_column("job_id")
+
+
+def downgrade():
+ with op.batch_alter_table("task_instance", schema=None) as batch_op:
+ batch_op.add_column(sa.Column("job_id", sa.INTEGER(), autoincrement=False, nullable=True))
+ batch_op.drop_index("ti_heartbeat")
+ batch_op.create_index("ti_job_id", ["job_id"], unique=False)
+ batch_op.drop_column("last_heartbeat_at")
+ with op.batch_alter_table("task_instance_history", schema=None) as batch_op:
+ batch_op.add_column(sa.Column("job_id", sa.INTEGER(), autoincrement=False, nullable=True))
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index bb07ba6d848a..e86c47778246 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -133,7 +133,7 @@
tuple_in_condition,
with_row_locks,
)
-from airflow.utils.state import DagRunState, JobState, State, TaskInstanceState
+from airflow.utils.state import DagRunState, State, TaskInstanceState
from airflow.utils.task_group import MappedTaskGroup
from airflow.utils.task_instance_session import set_current_task_instance_session
from airflow.utils.timeout import timeout
@@ -221,11 +221,16 @@ def _add_log(
)
+@internal_api_call
+@provide_session
+def _update_ti_heartbeat(id: str, when: datetime, session: Session = NEW_SESSION):
+ session.execute(update(TaskInstance).where(TaskInstance.id == id).values(last_heartbeat_at=when))
+
+
def _run_raw_task(
ti: TaskInstance | TaskInstancePydantic,
mark_success: bool = False,
test_mode: bool = False,
- job_id: str | None = None,
pool: str | None = None,
raise_on_defer: bool = False,
session: Session | None = None,
@@ -249,7 +254,6 @@ def _run_raw_task(
ti.test_mode = test_mode
ti.refresh_from_task(ti.task, pool_override=pool)
ti.refresh_from_db(session=session)
- ti.job_id = job_id
ti.hostname = get_hostname()
ti.pid = os.getpid()
if not test_mode:
@@ -451,7 +455,6 @@ def clear_task_instances(
If set to False, DagRuns state will not be changed.
:param dag: DAG object
"""
- job_ids = []
# Keys: dag_id -> run_id -> map_indexes -> try_numbers -> task_id
task_id_by_key: dict[str, dict[str, dict[int, dict[int, set[str]]]]] = defaultdict(
lambda: defaultdict(lambda: defaultdict(lambda: defaultdict(set)))
@@ -462,11 +465,9 @@ def clear_task_instances(
for ti in tis:
TaskInstanceHistory.record_ti(ti, session)
if ti.state == TaskInstanceState.RUNNING:
- if ti.job_id:
- # If a task is cleared when running, set its state to RESTARTING so that
- # the task is terminated and becomes eligible for retry.
- ti.state = TaskInstanceState.RESTARTING
- job_ids.append(ti.job_id)
+ # If a task is cleared when running, set its state to RESTARTING so that
+ # the task is terminated and becomes eligible for retry.
+ ti.state = TaskInstanceState.RESTARTING
else:
ti_dag = dag if dag and dag.dag_id == ti.dag_id else dag_bag.get_dag(ti.dag_id, session=session)
task_id = ti.task_id
@@ -522,11 +523,6 @@ def clear_task_instances(
delete_qry = TR.__table__.delete().where(conditions)
session.execute(delete_qry)
- if job_ids:
- from airflow.jobs.job import Job
-
- session.execute(update(Job).where(Job.id.in_(job_ids)).values(state=JobState.RESTARTING))
-
if dag_run_state is not False and tis:
from airflow.models.dagrun import DagRun # Avoid circular import
@@ -806,7 +802,6 @@ def _set_ti_attrs(target, source, include_dag_run=False):
target.max_tries = source.max_tries
target.hostname = source.hostname
target.unixname = source.unixname
- target.job_id = source.job_id
target.pool = source.pool
target.pool_slots = source.pool_slots or 1
target.queue = source.queue
@@ -815,6 +810,7 @@ def _set_ti_attrs(target, source, include_dag_run=False):
target.custom_operator_name = source.custom_operator_name
target.queued_dttm = source.queued_dttm
target.queued_by_job_id = source.queued_by_job_id
+ target.last_heartbeat_at = source.last_heartbeat_at
target.pid = source.pid
target.executor = source.executor
target.executor_config = source.executor_config
@@ -1844,7 +1840,6 @@ class TaskInstance(Base, LoggingMixin):
max_tries = Column(Integer, server_default=text("-1"))
hostname = Column(String(1000))
unixname = Column(String(1000))
- job_id = Column(Integer)
pool = Column(String(256), nullable=False)
pool_slots = Column(Integer, default=1, nullable=False)
queue = Column(String(256))
@@ -1853,6 +1848,8 @@ class TaskInstance(Base, LoggingMixin):
custom_operator_name = Column(String(1000))
queued_dttm = Column(UtcDateTime)
queued_by_job_id = Column(Integer)
+
+ last_heartbeat_at = Column(UtcDateTime)
pid = Column(Integer)
executor = Column(String(1000))
executor_config = Column(ExecutorConfigType(pickler=dill))
@@ -1885,8 +1882,8 @@ class TaskInstance(Base, LoggingMixin):
Index("ti_state", state),
Index("ti_state_lkp", dag_id, task_id, run_id, state),
Index("ti_pool", pool, state, priority_weight),
- Index("ti_job_id", job_id),
Index("ti_trigger_id", trigger_id),
+ Index("ti_heartbeat", last_heartbeat_at),
PrimaryKeyConstraint("id", name="task_instance_pkey"),
UniqueConstraint("dag_id", "task_id", "run_id", "map_index", name="task_instance_composite_key"),
ForeignKeyConstraint(
@@ -2035,7 +2032,6 @@ def _command_as_list(
local: bool = False,
pickle_id: int | None = None,
raw: bool = False,
- job_id: str | None = None,
pool: str | None = None,
cfg_path: str | None = None,
) -> list[str]:
@@ -2074,7 +2070,6 @@ def _command_as_list(
pickle_id=pickle_id,
file_path=path,
raw=raw,
- job_id=job_id,
pool=pool,
cfg_path=cfg_path,
map_index=ti.map_index,
@@ -2091,7 +2086,6 @@ def command_as_list(
local: bool = False,
pickle_id: int | None = None,
raw: bool = False,
- job_id: str | None = None,
pool: str | None = None,
cfg_path: str | None = None,
) -> list[str]:
@@ -2111,7 +2105,6 @@ def command_as_list(
local=local,
pickle_id=pickle_id,
raw=raw,
- job_id=job_id,
pool=pool,
cfg_path=cfg_path,
)
@@ -2131,7 +2124,6 @@ def generate_command(
pickle_id: int | None = None,
file_path: PurePath | str | None = None,
raw: bool = False,
- job_id: str | None = None,
pool: str | None = None,
cfg_path: str | None = None,
map_index: int = -1,
@@ -2156,7 +2148,6 @@ def generate_command(
associated with the pickled DAG
:param file_path: path to the file containing the DAG definition
:param raw: raw mode (needs more details)
- :param job_id: job ID (needs more details)
:param pool: the Airflow pool that the task should run in
:param cfg_path: the Path to the configuration file
:return: shell command that can be used to run the task instance
@@ -2166,8 +2157,6 @@ def generate_command(
cmd.extend(["--mark-success"])
if pickle_id:
cmd.extend(["--pickle", str(pickle_id)])
- if job_id:
- cmd.extend(["--job-id", str(job_id)])
if ignore_all_deps:
cmd.extend(["--ignore-all-dependencies"])
if ignore_task_deps:
@@ -2641,7 +2630,6 @@ def _check_and_change_state_before_execution(
mark_success: bool = False,
test_mode: bool = False,
hostname: str = "",
- job_id: str | None = None,
pool: str | None = None,
external_executor_id: str | None = None,
session: Session = NEW_SESSION,
@@ -2661,7 +2649,6 @@ def _check_and_change_state_before_execution(
:param mark_success: Don't run the task, mark its state as success
:param test_mode: Doesn't record success or failure in the DB
:param hostname: The hostname of the worker running the task instance.
- :param job_id: Job (LocalTaskJob / SchedulerJob) ID
:param pool: specifies the pool to use to run the task instance
:param external_executor_id: The identifier of the celery executor
:param session: SQLAlchemy ORM Session
@@ -2684,7 +2671,6 @@ def _check_and_change_state_before_execution(
ti.refresh_from_task(task, pool_override=pool)
ti.test_mode = test_mode
ti.refresh_from_db(session=session, lock_for_update=True)
- ti.job_id = job_id
ti.hostname = hostname
ti.pid = None
@@ -2789,7 +2775,6 @@ def check_and_change_state_before_execution(
ignore_ti_state: bool = False,
mark_success: bool = False,
test_mode: bool = False,
- job_id: str | None = None,
pool: str | None = None,
external_executor_id: str | None = None,
session: Session = NEW_SESSION,
@@ -2805,7 +2790,6 @@ def check_and_change_state_before_execution(
mark_success=mark_success,
test_mode=test_mode,
hostname=get_hostname(),
- job_id=job_id,
pool=pool,
external_executor_id=external_executor_id,
session=session,
@@ -2876,7 +2860,6 @@ def _run_raw_task(
self,
mark_success: bool = False,
test_mode: bool = False,
- job_id: str | None = None,
pool: str | None = None,
raise_on_defer: bool = False,
session: Session = NEW_SESSION,
@@ -2901,7 +2884,6 @@ def _run_raw_task(
ti=self,
mark_success=mark_success,
test_mode=test_mode,
- job_id=job_id,
pool=pool,
raise_on_defer=raise_on_defer,
session=session,
@@ -3071,6 +3053,11 @@ def _execute_task(self, context: Context, task_orig: Operator):
"""
return _execute_task(self, context, task_orig)
+ def update_heartbeat(self):
+ cm = nullcontext() if InternalApiConfig.get_use_internal_api() else create_session()
+ with cm as session_or_null:
+ _update_ti_heartbeat(self.id, timezone.utcnow(), session_or_null)
+
@provide_session
def defer_task(self, exception: TaskDeferred | None, session: Session = NEW_SESSION) -> None:
"""
@@ -3101,7 +3088,6 @@ def run(
ignore_ti_state: bool = False,
mark_success: bool = False,
test_mode: bool = False,
- job_id: str | None = None,
pool: str | None = None,
session: Session = NEW_SESSION,
raise_on_defer: bool = False,
@@ -3116,7 +3102,6 @@ def run(
ignore_ti_state=ignore_ti_state,
mark_success=mark_success,
test_mode=test_mode,
- job_id=job_id,
pool=pool,
session=session,
)
@@ -3126,7 +3111,6 @@ def run(
self._run_raw_task(
mark_success=mark_success,
test_mode=test_mode,
- job_id=job_id,
pool=pool,
session=session,
raise_on_defer=raise_on_defer,
diff --git a/airflow/models/taskinstancehistory.py b/airflow/models/taskinstancehistory.py
index ccdca700af6e..8c77daf92579 100644
--- a/airflow/models/taskinstancehistory.py
+++ b/airflow/models/taskinstancehistory.py
@@ -70,7 +70,6 @@ class TaskInstanceHistory(Base):
max_tries = Column(Integer, server_default=text("-1"))
hostname = Column(String(1000))
unixname = Column(String(1000))
- job_id = Column(Integer)
pool = Column(String(256), nullable=False)
pool_slots = Column(Integer, default=1, nullable=False)
queue = Column(String(256))
diff --git a/airflow/serialization/pydantic/taskinstance.py b/airflow/serialization/pydantic/taskinstance.py
index caf44bea4c67..bf121353ca80 100644
--- a/airflow/serialization/pydantic/taskinstance.py
+++ b/airflow/serialization/pydantic/taskinstance.py
@@ -40,6 +40,7 @@
)
from airflow.serialization.pydantic.dag import DagModelPydantic
from airflow.serialization.pydantic.dag_run import DagRunPydantic
+from airflow.utils import timezone
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.net import get_hostname
from airflow.utils.xcom import XCOM_RETURN_KEY
@@ -83,6 +84,7 @@ def validated_operator(x: dict[str, Any] | Operator, _info: ValidationInfo) -> A
class TaskInstancePydantic(BaseModelPydantic, LoggingMixin):
"""Serializable representation of the TaskInstance ORM SqlAlchemyModel used by internal API."""
+ id: str
task_id: str
dag_id: str
run_id: str
@@ -96,7 +98,6 @@ class TaskInstancePydantic(BaseModelPydantic, LoggingMixin):
max_tries: int
hostname: str
unixname: str
- job_id: Optional[int]
pool: str
pool_slots: int
queue: str
@@ -105,6 +106,7 @@ class TaskInstancePydantic(BaseModelPydantic, LoggingMixin):
custom_operator_name: Optional[str]
queued_dttm: Optional[datetime]
queued_by_job_id: Optional[int]
+ last_heartbeat_at: Optional[datetime] = None
pid: Optional[int]
executor: Optional[str]
executor_config: Any
@@ -138,7 +140,6 @@ def _run_raw_task(
self,
mark_success: bool = False,
test_mode: bool = False,
- job_id: str | None = None,
pool: str | None = None,
raise_on_defer: bool = False,
session: Session | None = None,
@@ -147,7 +148,6 @@ def _run_raw_task(
ti=self,
mark_success=mark_success,
test_mode=test_mode,
- job_id=job_id,
pool=pool,
raise_on_defer=raise_on_defer,
session=session,
@@ -252,6 +252,12 @@ def refresh_from_db(self, session: Session | None = None, lock_for_update: bool
_refresh_from_db(task_instance=self, session=session, lock_for_update=lock_for_update)
+ def update_heartbeat(self):
+ """Update the recorded heartbeat for this task to "now"."""
+ from airflow.models.taskinstance import _update_ti_heartbeat
+
+ return _update_ti_heartbeat(self.id, timezone.utcnow())
+
def set_duration(self) -> None:
"""Set task instance duration."""
from airflow.models.taskinstance import _set_duration
@@ -441,7 +447,6 @@ def check_and_change_state_before_execution(
ignore_ti_state: bool = False,
mark_success: bool = False,
test_mode: bool = False,
- job_id: str | None = None,
pool: str | None = None,
external_executor_id: str | None = None,
session: Session | None = None,
@@ -457,7 +462,6 @@ def check_and_change_state_before_execution(
mark_success=mark_success,
test_mode=test_mode,
hostname=get_hostname(),
- job_id=job_id,
pool=pool,
external_executor_id=external_executor_id,
session=session,
@@ -484,7 +488,6 @@ def command_as_list(
local: bool = False,
pickle_id: int | None = None,
raw: bool = False,
- job_id: str | None = None,
pool: str | None = None,
cfg_path: str | None = None,
) -> list[str]:
@@ -504,7 +507,6 @@ def command_as_list(
local=local,
pickle_id=pickle_id,
raw=raw,
- job_id=job_id,
pool=pool,
cfg_path=cfg_path,
)
diff --git a/airflow/task/standard_task_runner.py b/airflow/task/standard_task_runner.py
index d7f75f40e17d..a5641002c961 100644
--- a/airflow/task/standard_task_runner.py
+++ b/airflow/task/standard_task_runner.py
@@ -101,7 +101,6 @@ def __init__(self, job_runner: LocalTaskJobRunner):
raw=True,
pickle_id=self.job_runner.pickle_id,
mark_success=self.job_runner.mark_success,
- job_id=self.job_runner.job.id,
pool=self.job_runner.pool,
cfg_path=cfg_path,
)
@@ -159,15 +158,10 @@ def _start_by_fork(self):
# [1:] - remove "airflow" from the start of the command
args = parser.parse_args(self._command[1:])
- # We prefer the job_id passed on the command-line because at this time, the
- # task instance may not have been updated.
- job_id = getattr(args, "job_id", self._task_instance.job_id)
self.log.info("Running: %s", self._command)
- self.log.info("Job %s: Subtask %s", job_id, self._task_instance.task_id)
+ self.log.info("Subtask %s", self._task_instance.task_id)
proc_title = "airflow task runner: {0.dag_id} {0.task_id} {0.execution_date_or_run_id}"
- if job_id is not None:
- proc_title += " {0.job_id}"
setproctitle(proc_title.format(args))
return_code = 0
try:
@@ -179,15 +173,11 @@ def _start_by_fork(self):
return_code = 0
if isinstance(ret, TaskReturnCode):
return_code = ret.value
- except Exception as exc:
+ except Exception:
return_code = 1
self.log.exception(
- "Failed to execute job %s for task %s (%s; %r)",
- job_id,
- self._task_instance.task_id,
- exc,
- os.getpid(),
+ "Failed to execute task_id=%s pid=%r", self._task_instance.task_id, os.getpid()
)
except SystemExit as sys_ex:
# Someone called sys.exit() in the fork - mistakenly. You should not run sys.exit() in
@@ -250,10 +240,10 @@ def terminate(self):
if self._rc == -signal.SIGKILL:
self.log.error(
(
- "Job %s was killed before it finished (likely due to running out of memory)",
+ "TI %s was killed before it finished (likely due to running out of memory)",
"For more information, see https://airflow.apache.org/docs/apache-airflow/stable/troubleshooting.html#LocalTaskJob-killed",
),
- self._task_instance.job_id,
+ self._task_instance.id,
)
def get_process_pid(self) -> int:
@@ -286,8 +276,7 @@ def _read_task_logs(self, stream):
if not line:
break
self.log.info(
- "Job %s: Subtask %s %s",
- self._task_instance.job_id,
+ "Task %s %s",
self._task_instance.task_id,
line.rstrip("\n"),
)
diff --git a/airflow/utils/db.py b/airflow/utils/db.py
index 2c82f5f1948b..dd3e8c5d2002 100644
--- a/airflow/utils/db.py
+++ b/airflow/utils/db.py
@@ -97,7 +97,7 @@ class MappedClassProtocol(Protocol):
"2.9.2": "686269002441",
"2.10.0": "22ed7efa9da2",
"2.10.3": "5f2621c13b39",
- "3.0.0": "5f57a45b8433",
+ "3.0.0": "d8cd3297971e",
}
diff --git a/airflow/www/views.py b/airflow/www/views.py
index d50d7bb2e78e..e287c027a894 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -5121,7 +5121,6 @@ class TaskInstanceModelView(AirflowModelView):
"end_date",
"duration",
"note",
- "job_id",
"hostname",
"unixname",
"priority_weight",
@@ -5146,7 +5145,6 @@ class TaskInstanceModelView(AirflowModelView):
"end_date",
"duration",
# "note", # TODO: Maybe figure out how to re-enable this.
- "job_id",
"hostname",
"unixname",
"priority_weight",
@@ -5192,7 +5190,7 @@ class TaskInstanceModelView(AirflowModelView):
edit_form = TaskInstanceEditForm
- base_order = ("job_id", "asc")
+ base_order = ("queued_dttm", "asc")
base_filters = [["dag_id", DagFilter, list]]
diff --git a/docs/apache-airflow/img/airflow_erd.sha256 b/docs/apache-airflow/img/airflow_erd.sha256
index f278eee7d05e..8adffd106eae 100644
--- a/docs/apache-airflow/img/airflow_erd.sha256
+++ b/docs/apache-airflow/img/airflow_erd.sha256
@@ -1 +1 @@
-9b9dcf915eff051a5cd77176a78bdcca3703b227373efe83fd0a1d4d05623c28
\ No newline at end of file
+1d781ee92cc59e7647d7f72ddc542b7f17e03fc8b822950db74415c38279d40f
\ No newline at end of file
diff --git a/docs/apache-airflow/img/airflow_erd.svg b/docs/apache-airflow/img/airflow_erd.svg
index 177c5a60f14e..1b0d5b346c95 100644
--- a/docs/apache-airflow/img/airflow_erd.svg
+++ b/docs/apache-airflow/img/airflow_erd.svg
@@ -1144,9 +1144,9 @@
[VARCHAR(1000)]
-job_id
-
- [INTEGER]
+last_heartbeat_at
+
+ [TIMESTAMP]
map_index
@@ -1708,176 +1708,172 @@
task_instance_history
-
-task_instance_history
-
-id
-
- [INTEGER]
- NOT NULL
-
-custom_operator_name
-
- [VARCHAR(1000)]
-
-dag_id
-
- [VARCHAR(250)]
- NOT NULL
-
-duration
-
- [DOUBLE_PRECISION]
-
-end_date
-
- [TIMESTAMP]
-
-executor
-
- [VARCHAR(1000)]
-
-executor_config
-
- [BYTEA]
-
-external_executor_id
-
- [VARCHAR(250)]
-
-hostname
-
- [VARCHAR(1000)]
-
-job_id
-
- [INTEGER]
-
-map_index
-
- [INTEGER]
- NOT NULL
-
-max_tries
-
- [INTEGER]
-
-next_kwargs
-
- [JSON]
-
-next_method
-
- [VARCHAR(1000)]
-
-operator
-
- [VARCHAR(1000)]
-
-pid
-
- [INTEGER]
-
-pool
-
- [VARCHAR(256)]
- NOT NULL
-
-pool_slots
-
- [INTEGER]
- NOT NULL
-
-priority_weight
-
- [INTEGER]
-
-queue
-
- [VARCHAR(256)]
-
-queued_by_job_id
-
- [INTEGER]
-
-queued_dttm
-
- [TIMESTAMP]
-
-rendered_map_index
-
- [VARCHAR(250)]
-
-run_id
-
- [VARCHAR(250)]
- NOT NULL
-
-start_date
-
- [TIMESTAMP]
-
-state
-
- [VARCHAR(20)]
-
-task_display_name
-
- [VARCHAR(2000)]
-
-task_id
-
- [VARCHAR(250)]
- NOT NULL
-
-trigger_id
-
- [INTEGER]
-
-trigger_timeout
-
- [TIMESTAMP]
-
-try_number
-
- [INTEGER]
- NOT NULL
-
-unixname
-
- [VARCHAR(1000)]
-
-updated_at
-
- [TIMESTAMP]
+
+task_instance_history
+
+id
+
+ [INTEGER]
+ NOT NULL
+
+custom_operator_name
+
+ [VARCHAR(1000)]
+
+dag_id
+
+ [VARCHAR(250)]
+ NOT NULL
+
+duration
+
+ [DOUBLE_PRECISION]
+
+end_date
+
+ [TIMESTAMP]
+
+executor
+
+ [VARCHAR(1000)]
+
+executor_config
+
+ [BYTEA]
+
+external_executor_id
+
+ [VARCHAR(250)]
+
+hostname
+
+ [VARCHAR(1000)]
+
+map_index
+
+ [INTEGER]
+ NOT NULL
+
+max_tries
+
+ [INTEGER]
+
+next_kwargs
+
+ [JSON]
+
+next_method
+
+ [VARCHAR(1000)]
+
+operator
+
+ [VARCHAR(1000)]
+
+pid
+
+ [INTEGER]
+
+pool
+
+ [VARCHAR(256)]
+ NOT NULL
+
+pool_slots
+
+ [INTEGER]
+ NOT NULL
+
+priority_weight
+
+ [INTEGER]
+
+queue
+
+ [VARCHAR(256)]
+
+queued_by_job_id
+
+ [INTEGER]
+
+queued_dttm
+
+ [TIMESTAMP]
+
+rendered_map_index
+
+ [VARCHAR(250)]
+
+run_id
+
+ [VARCHAR(250)]
+ NOT NULL
+
+start_date
+
+ [TIMESTAMP]
+
+state
+
+ [VARCHAR(20)]
+
+task_display_name
+
+ [VARCHAR(2000)]
+
+task_id
+
+ [VARCHAR(250)]
+ NOT NULL
+
+trigger_id
+
+ [INTEGER]
+
+trigger_timeout
+
+ [TIMESTAMP]
+
+try_number
+
+ [INTEGER]
+ NOT NULL
+
+unixname
+
+ [VARCHAR(1000)]
+
+updated_at
+
+ [TIMESTAMP]
task_instance--task_instance_history
-
-0..N
-1
+
+0..N
+1
task_instance--task_instance_history
-
-0..N
-1
+
+0..N
+1
task_instance--task_instance_history
-
-0..N
-1
+
+0..N
+1
task_instance--task_instance_history
-
-0..N
-1
+
+0..N
+1
diff --git a/docs/apache-airflow/migrations-ref.rst b/docs/apache-airflow/migrations-ref.rst
index f3441ceaf72b..f133a67e08ef 100644
--- a/docs/apache-airflow/migrations-ref.rst
+++ b/docs/apache-airflow/migrations-ref.rst
@@ -39,7 +39,9 @@ Here's the list of all the Database Migrations that are executed via when you ru
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| Revision ID | Revises ID | Airflow Version | Description |
+=========================+==================+===================+==============================================================+
-| ``5f57a45b8433`` (head) | ``486ac7936b78`` | ``3.0.0`` | Drop task_fail table. |
+| ``d8cd3297971e`` (head) | ``5f57a45b8433`` | ``3.0.0`` | Add last_heartbeat_at directly to TI. |
++-------------------------+------------------+-------------------+--------------------------------------------------------------+
+| ``5f57a45b8433`` | ``486ac7936b78`` | ``3.0.0`` | Drop task_fail table. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| ``486ac7936b78`` | ``d59cbbef95eb`` | ``3.0.0`` | remove scheduler_lock column. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
diff --git a/scripts/ci/pre_commit/check_ti_vs_tis_attributes.py b/scripts/ci/pre_commit/check_ti_vs_tis_attributes.py
index 1dfc51a0a040..16c1df48a9e8 100755
--- a/scripts/ci/pre_commit/check_ti_vs_tis_attributes.py
+++ b/scripts/ci/pre_commit/check_ti_vs_tis_attributes.py
@@ -52,6 +52,8 @@ def compare_attributes(path1, path2):
"triggerer_job",
"note",
"rendered_task_instance_fields",
+ # Storing last heartbeat for historic TIs is not interesting/useful
+ "last_heartbeat_at",
} # exclude attrs not necessary to be in TaskInstanceHistory
if not diff:
return
diff --git a/tests/api_connexion/endpoints/test_mapped_task_instance_endpoint.py b/tests/api_connexion/endpoints/test_mapped_task_instance_endpoint.py
index 237ef5910c78..68ecd1e83898 100644
--- a/tests/api_connexion/endpoints/test_mapped_task_instance_endpoint.py
+++ b/tests/api_connexion/endpoints/test_mapped_task_instance_endpoint.py
@@ -77,7 +77,6 @@ def setup_attrs(self, configured_app) -> None:
"duration": 10000,
"pool": "default_pool",
"queue": "default_queue",
- "job_id": 0,
}
self.app = configured_app
self.client = self.app.test_client() # type:ignore
diff --git a/tests/api_connexion/endpoints/test_task_instance_endpoint.py b/tests/api_connexion/endpoints/test_task_instance_endpoint.py
index bc8836981d42..e1fa6d13b748 100644
--- a/tests/api_connexion/endpoints/test_task_instance_endpoint.py
+++ b/tests/api_connexion/endpoints/test_task_instance_endpoint.py
@@ -81,7 +81,6 @@ def setup_attrs(self, configured_app, dagbag) -> None:
"duration": 10000,
"pool": "default_pool",
"queue": "default_queue",
- "job_id": 0,
}
self.app = configured_app
self.client = self.app.test_client() # type:ignore
diff --git a/tests/api_fastapi/core_api/routes/public/test_task_instances.py b/tests/api_fastapi/core_api/routes/public/test_task_instances.py
index fa9cc0b161d0..717f17ca278a 100644
--- a/tests/api_fastapi/core_api/routes/public/test_task_instances.py
+++ b/tests/api_fastapi/core_api/routes/public/test_task_instances.py
@@ -68,7 +68,6 @@ def setup_attrs(self, session) -> None:
"duration": 10000,
"pool": "default_pool",
"queue": "default_queue",
- "job_id": 0,
}
clear_db_runs()
clear_rendered_ti_fields()
diff --git a/tests/assets/test_manager.py b/tests/assets/test_manager.py
index eb12f281606e..3310502a97ba 100644
--- a/tests/assets/test_manager.py
+++ b/tests/assets/test_manager.py
@@ -59,6 +59,7 @@ def clear_assets():
@pytest.fixture
def mock_task_instance():
return TaskInstancePydantic(
+ id="1",
task_id="5",
dag_id="7",
run_id="11",
diff --git a/tests/cli/commands/test_task_command.py b/tests/cli/commands/test_task_command.py
index 9b605e818d8f..5a4e0b279242 100644
--- a/tests/cli/commands/test_task_command.py
+++ b/tests/cli/commands/test_task_command.py
@@ -932,7 +932,7 @@ def test_logging_with_run_task_subprocess(self, session):
print(logs) # In case of a test failures this line would show detailed log
logs_list = logs.splitlines()
- assert f"Subtask {self.task_id}" in logs
+ assert f"Task {self.task_id}" in logs
assert "standard_task_runner.py" in logs
self.assert_log_line("Log from DAG Logger", logs_list)
self.assert_log_line("Log from TI Logger", logs_list)
diff --git a/tests/executors/test_debug_executor.py b/tests/executors/test_debug_executor.py
index 20ee821842c8..a8ad66795767 100644
--- a/tests/executors/test_debug_executor.py
+++ b/tests/executors/test_debug_executor.py
@@ -50,7 +50,7 @@ def test_run_task(self, task_instance_mock):
succeeded = executor._run_task(task_instance_mock)
assert succeeded
- task_instance_mock.run.assert_called_once_with(job_id=job_id)
+ task_instance_mock.run.assert_called()
def test_queue_task_instance(self):
key = "ti_key"
diff --git a/tests/jobs/test_local_task_job.py b/tests/jobs/test_local_task_job.py
index 84a7465a8236..7ee037478833 100644
--- a/tests/jobs/test_local_task_job.py
+++ b/tests/jobs/test_local_task_job.py
@@ -131,17 +131,21 @@ def test_localtaskjob_essential_attr(self, dag_maker):
assert all(check_result_2)
@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
- def test_localtaskjob_heartbeat(self, dag_maker):
+ def test_localtaskjob_heartbeat(self, dag_maker, time_machine):
session = settings.Session()
with dag_maker("test_localtaskjob_heartbeat"):
op1 = EmptyOperator(task_id="op1")
+ time_machine.move_to(DEFAULT_DATE, tick=False)
+
dr = dag_maker.create_dagrun()
ti = dr.get_task_instance(task_id=op1.task_id, session=session)
ti.state = State.RUNNING
ti.hostname = "blablabla"
session.commit()
+ assert ti.last_heartbeat_at is None, "Pre-conditioncheck"
+
job1 = Job(dag_id=ti.dag_id, executor=SequentialExecutor())
job_runner = LocalTaskJobRunner(job=job1, task_instance=ti, ignore_ti_state=True)
ti.task = op1
@@ -149,9 +153,12 @@ def test_localtaskjob_heartbeat(self, dag_maker):
job1.task_runner = StandardTaskRunner(job_runner)
job1.task_runner.process = mock.Mock()
job_runner.task_runner = job1.task_runner
- with pytest.raises(AirflowException):
+ with pytest.raises(AirflowException, match="Hostname .* does not match"):
job_runner.heartbeat_callback()
+ ti = session.get(TaskInstance, (ti.id,))
+ assert ti.last_heartbeat_at is None, "Should still be none"
+
job1.task_runner.process.pid = 1
ti.state = State.RUNNING
ti.hostname = get_hostname()
@@ -164,19 +171,22 @@ def test_localtaskjob_heartbeat(self, dag_maker):
job_runner.heartbeat_callback(session=None)
job1.task_runner.process.pid = 2
- with pytest.raises(AirflowException):
+ with pytest.raises(AirflowException, match="PID .* does not match"):
job_runner.heartbeat_callback()
# Now, set the ti.pid to None and test that no error
# is raised.
ti.pid = None
- session.merge(ti)
+ ti = session.merge(ti)
session.commit()
assert ti.pid != job1.task_runner.process.pid
assert not ti.run_as_user
assert not job1.task_runner.run_as_user
job_runner.heartbeat_callback()
+ ti = session.get(TaskInstance, (ti.id,))
+ assert ti.last_heartbeat_at == DEFAULT_DATE
+
@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
@mock.patch("subprocess.check_call")
@mock.patch("airflow.jobs.local_task_job_runner.psutil")
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 311de0ce2b64..da3ccc201eb4 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -50,7 +50,6 @@
from airflow.executors.executor_constants import MOCK_EXECUTOR
from airflow.executors.executor_loader import ExecutorLoader
from airflow.jobs.job import Job, run_job
-from airflow.jobs.local_task_job_runner import LocalTaskJobRunner
from airflow.jobs.scheduler_job_runner import SchedulerJobRunner
from airflow.models.asset import AssetActive, AssetDagRunQueue, AssetEvent, AssetModel
from airflow.models.backfill import Backfill, _create_backfill
@@ -68,7 +67,7 @@
from airflow.utils import timezone
from airflow.utils.file import list_py_file_paths
from airflow.utils.session import create_session, provide_session
-from airflow.utils.state import DagRunState, JobState, State, TaskInstanceState
+from airflow.utils.state import DagRunState, State, TaskInstanceState
from airflow.utils.types import DagRunType
from tests.listeners import dag_listener
@@ -5665,16 +5664,10 @@ def test_find_and_purge_zombies(self, load_examples, session):
for task_id in tasks_to_setup:
task = dag.get_task(task_id=task_id)
ti = TaskInstance(task, run_id=dag_run.run_id, state=State.RUNNING)
- ti.queued_by_job_id = 999
-
- local_job = Job(dag_id=ti.dag_id)
- LocalTaskJobRunner(job=local_job, task_instance=ti)
- local_job.state = TaskInstanceState.FAILED
- session.add(local_job)
- session.flush()
+ ti.last_heartbeat_at = timezone.utcnow() - timedelta(minutes=6)
+ ti.queued_by_job_id = 999
- ti.job_id = local_job.id
session.add(ti)
session.flush()
@@ -5733,13 +5726,6 @@ def test_zombie_message(self, load_examples):
ti = TaskInstance(task, run_id=dag_run.run_id, state=State.RUNNING)
ti.queued_by_job_id = 999
- local_job = Job(dag_id=ti.dag_id)
- local_job.state = TaskInstanceState.FAILED
-
- session.add(local_job)
- session.flush()
-
- ti.job_id = local_job.id
session.add(ti)
session.flush()
@@ -5795,17 +5781,11 @@ def test_find_zombies_handle_failure_callbacks_are_correctly_passed_to_dag_proce
task = dag.get_task(task_id="run_this_last")
ti = TaskInstance(task, run_id=dag_run.run_id, state=State.RUNNING)
-
- local_job = Job(dag_id=ti.dag_id)
- LocalTaskJobRunner(job=local_job, task_instance=ti)
- local_job.state = JobState.FAILED
- session.add(local_job)
- session.flush()
+ ti.last_heartbeat_at = timezone.utcnow() - timedelta(minutes=6)
# TODO: If there was an actual Relationship between TI and Job
# we wouldn't need this extra commit
session.add(ti)
- ti.job_id = local_job.id
session.flush()
scheduler_job = Job()
diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py
index ccd19ad3272f..8a1df0594e4e 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -3993,7 +3993,6 @@ def test_refresh_from_db(self, create_task_instance):
"hostname": "some_unique_hostname",
"id": str(uuid6.uuid7()),
"unixname": "some_unique_unixname",
- "job_id": 1234,
"pool": "some_fake_pool_id",
"pool_slots": 25,
"queue": "some_queue_id",
@@ -4004,6 +4003,7 @@ def test_refresh_from_db(self, create_task_instance):
"rendered_map_index": None,
"queued_by_job_id": 321,
"pid": 123,
+ "last_heartbeat_at": run_date + datetime.timedelta(hours=1, seconds=4),
"executor": "some_executor",
"executor_config": {"Some": {"extra": "information"}},
"external_executor_id": "some_executor_id",
diff --git a/tests/www/views/test_views_tasks.py b/tests/www/views/test_views_tasks.py
index fabd104e8c26..19caafe55bc6 100644
--- a/tests/www/views/test_views_tasks.py
+++ b/tests/www/views/test_views_tasks.py
@@ -1109,7 +1109,7 @@ def test_task_instances(admin_client):
"external_executor_id": None,
"hostname": "",
"id": unittest.mock.ANY, # Ignore the `id` field
- "job_id": None,
+ "last_heartbeat_at": None,
"map_index": -1,
"max_tries": 0,
"next_kwargs": None,
@@ -1145,7 +1145,7 @@ def test_task_instances(admin_client):
"external_executor_id": None,
"hostname": "",
"id": unittest.mock.ANY, # Ignore the `id` field
- "job_id": None,
+ "last_heartbeat_at": None,
"map_index": -1,
"max_tries": 0,
"next_kwargs": None,
@@ -1181,7 +1181,7 @@ def test_task_instances(admin_client):
"external_executor_id": None,
"hostname": "",
"id": unittest.mock.ANY, # Ignore the `id` field
- "job_id": None,
+ "last_heartbeat_at": None,
"map_index": -1,
"max_tries": 0,
"next_kwargs": None,
@@ -1217,7 +1217,7 @@ def test_task_instances(admin_client):
"external_executor_id": None,
"hostname": "",
"id": unittest.mock.ANY, # Ignore the `id` field
- "job_id": None,
+ "last_heartbeat_at": None,
"map_index": -1,
"max_tries": 0,
"next_kwargs": None,
@@ -1253,7 +1253,7 @@ def test_task_instances(admin_client):
"external_executor_id": None,
"hostname": "",
"id": unittest.mock.ANY, # Ignore the `id` field
- "job_id": None,
+ "last_heartbeat_at": None,
"map_index": -1,
"max_tries": 0,
"next_kwargs": None,
@@ -1289,7 +1289,7 @@ def test_task_instances(admin_client):
"external_executor_id": None,
"hostname": "",
"id": unittest.mock.ANY, # Ignore the `id` field
- "job_id": None,
+ "last_heartbeat_at": None,
"map_index": -1,
"max_tries": 0,
"next_kwargs": None,
@@ -1325,7 +1325,7 @@ def test_task_instances(admin_client):
"external_executor_id": None,
"hostname": "",
"id": unittest.mock.ANY, # Ignore the `id` field
- "job_id": None,
+ "last_heartbeat_at": None,
"map_index": -1,
"max_tries": 0,
"next_kwargs": None,