diff --git a/airflow/api/common/delete_dag.py b/airflow/api/common/delete_dag.py
index 11b046648c5a..1e9d4a80dbd6 100644
--- a/airflow/api/common/delete_dag.py
+++ b/airflow/api/common/delete_dag.py
@@ -28,7 +28,6 @@
from airflow.exceptions import AirflowException, DagNotFound
from airflow.models import DagModel
from airflow.models.errors import ParseImportError
-from airflow.models.serialized_dag import SerializedDagModel
from airflow.utils.db import get_sqla_model_classes
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.state import TaskInstanceState
@@ -64,11 +63,6 @@ def delete_dag(dag_id: str, keep_records_in_log: bool = True, session: Session =
if dag is None:
raise DagNotFound(f"Dag id {dag_id} not found")
- # Scheduler removes DAGs without files from serialized_dag table every dag_dir_list_interval.
- # There may be a lag, so explicitly removes serialized DAG here.
- if SerializedDagModel.has_dag(dag_id=dag_id, session=session):
- SerializedDagModel.remove_dag(dag_id=dag_id, session=session)
-
count = 0
for model in get_sqla_model_classes():
diff --git a/airflow/api_internal/endpoints/rpc_api_endpoint.py b/airflow/api_internal/endpoints/rpc_api_endpoint.py
index 1cdd2536e135..ea8d59e29ad3 100644
--- a/airflow/api_internal/endpoints/rpc_api_endpoint.py
+++ b/airflow/api_internal/endpoints/rpc_api_endpoint.py
@@ -37,7 +37,6 @@
from airflow.configuration import conf
from airflow.exceptions import AirflowException
from airflow.jobs.job import Job, most_recent_job
-from airflow.models.dagcode import DagCode
from airflow.models.taskinstance import _record_task_map_for_downstreams
from airflow.models.xcom_arg import _get_task_map_length
from airflow.sensors.base import _orig_start_date
@@ -94,7 +93,6 @@ def initialize_method_map() -> dict[str, Callable]:
_xcom_pull,
_record_task_map_for_downstreams,
trigger_dag,
- DagCode.remove_deleted_code,
DagModel.deactivate_deleted_dags,
DagModel.get_paused_dag_ids,
DagModel.get_current,
@@ -138,7 +136,6 @@ def initialize_method_map() -> dict[str, Callable]:
DagRun._get_log_template,
RenderedTaskInstanceFields._update_runtime_evaluated_template_fields,
SerializedDagModel.get_serialized_dag,
- SerializedDagModel.remove_deleted_dags,
SkipMixin._skip,
SkipMixin._skip_all_except,
TaskInstance._check_and_change_state_before_execution,
diff --git a/airflow/dag_processing/manager.py b/airflow/dag_processing/manager.py
index 597b09311928..3056a7fc1f4d 100644
--- a/airflow/dag_processing/manager.py
+++ b/airflow/dag_processing/manager.py
@@ -507,11 +507,7 @@ def deactivate_stale_dags(
stale_dag_threshold: int,
session: Session = NEW_SESSION,
):
- """
- Detect DAGs which are no longer present in files.
-
- Deactivate them and remove them in the serialized_dag table.
- """
+ """Detect and deactivate DAGs which are no longer present in files."""
to_deactivate = set()
query = select(DagModel.dag_id, DagModel.fileloc, DagModel.last_parsed_time).where(DagModel.is_active)
standalone_dag_processor = conf.getboolean("scheduler", "standalone_dag_processor")
diff --git a/airflow/migrations/versions/0047_3_0_0_add_dag_versioning.py b/airflow/migrations/versions/0047_3_0_0_add_dag_versioning.py
index ca685ae4e071..5d74507d6020 100644
--- a/airflow/migrations/versions/0047_3_0_0_add_dag_versioning.py
+++ b/airflow/migrations/versions/0047_3_0_0_add_dag_versioning.py
@@ -81,6 +81,8 @@ def upgrade():
ondelete="CASCADE",
)
batch_op.create_unique_constraint("dag_code_dag_version_id_uq", ["dag_version_id"])
+ batch_op.drop_column("last_updated")
+ batch_op.add_column(sa.Column("created_at", UtcDateTime(), nullable=False, default=timezone.utcnow))
with op.batch_alter_table(
"serialized_dag", recreate="always", naming_convention=naming_convention
@@ -100,6 +102,8 @@ def upgrade():
ondelete="CASCADE",
)
batch_op.create_unique_constraint("serialized_dag_dag_version_id_uq", ["dag_version_id"])
+ batch_op.drop_column("last_updated")
+ batch_op.add_column(sa.Column("created_at", UtcDateTime(), nullable=False, default=timezone.utcnow))
with op.batch_alter_table("task_instance", schema=None) as batch_op:
batch_op.add_column(sa.Column("dag_version_id", UUIDType(binary=False)))
@@ -140,6 +144,8 @@ def downgrade():
batch_op.drop_constraint(batch_op.f("dag_code_dag_version_id_fkey"), type_="foreignkey")
batch_op.drop_column("dag_version_id")
batch_op.create_primary_key("dag_code_pkey", ["fileloc_hash"])
+ batch_op.drop_column("created_at")
+ batch_op.add_column(sa.Column("last_updated", UtcDateTime(), nullable=False))
with op.batch_alter_table("serialized_dag", schema=None, naming_convention=naming_convention) as batch_op:
batch_op.drop_column("id")
@@ -149,6 +155,8 @@ def downgrade():
batch_op.create_primary_key("serialized_dag_pkey", ["dag_id"])
batch_op.drop_constraint(batch_op.f("serialized_dag_dag_version_id_fkey"), type_="foreignkey")
batch_op.drop_column("dag_version_id")
+ batch_op.drop_column("created_at")
+ batch_op.add_column(sa.Column("last_updated", UtcDateTime(), nullable=False))
with op.batch_alter_table("dag_run", schema=None) as batch_op:
batch_op.add_column(sa.Column("dag_hash", sa.String(length=32), autoincrement=False, nullable=True))
diff --git a/airflow/models/dag_version.py b/airflow/models/dag_version.py
index a0ce924caa8b..4bbd171b3f96 100644
--- a/airflow/models/dag_version.py
+++ b/airflow/models/dag_version.py
@@ -62,7 +62,7 @@ class DagVersion(Base):
)
dag_runs = relationship("DagRun", back_populates="dag_version", cascade="all, delete, delete-orphan")
task_instances = relationship("TaskInstance", back_populates="dag_version")
- created_at = Column(UtcDateTime, default=timezone.utcnow)
+ created_at = Column(UtcDateTime, nullable=False, default=timezone.utcnow)
__table_args__ = (
UniqueConstraint("dag_id", "version_number", name="dag_id_v_name_v_number_unique_constraint"),
diff --git a/airflow/models/dagcode.py b/airflow/models/dagcode.py
index c78f6cafaa6f..c78457e6059f 100644
--- a/airflow/models/dagcode.py
+++ b/airflow/models/dagcode.py
@@ -18,16 +18,15 @@
import logging
import struct
-from typing import TYPE_CHECKING, Collection
+from typing import TYPE_CHECKING
import uuid6
-from sqlalchemy import BigInteger, Column, ForeignKey, String, Text, delete, select
+from sqlalchemy import BigInteger, Column, ForeignKey, String, Text, select
from sqlalchemy.dialects.mysql import MEDIUMTEXT
from sqlalchemy.orm import relationship
from sqlalchemy.sql.expression import literal
from sqlalchemy_utils import UUIDType
-from airflow.api_internal.internal_api_call import internal_api_call
from airflow.configuration import conf
from airflow.exceptions import DagCodeNotFound
from airflow.models.base import Base
@@ -58,7 +57,7 @@ class DagCode(Base):
fileloc_hash = Column(BigInteger, nullable=False)
fileloc = Column(String(2000), nullable=False)
# The max length of fileloc exceeds the limit of indexing.
- last_updated = Column(UtcDateTime, nullable=False)
+ created_at = Column(UtcDateTime, nullable=False, default=timezone.utcnow)
source_code = Column(Text().with_variant(MEDIUMTEXT(), "mysql"), nullable=False)
dag_version_id = Column(
UUIDType(binary=False), ForeignKey("dag_version.id", ondelete="CASCADE"), nullable=False, unique=True
@@ -74,7 +73,7 @@ def __init__(self, dag_version, full_filepath: str, source_code: str | None = No
@classmethod
@provide_session
- def write_dag(cls, dag_version: DagVersion, fileloc: str, session: Session = NEW_SESSION) -> DagCode:
+ def write_code(cls, dag_version: DagVersion, fileloc: str, session: Session = NEW_SESSION) -> DagCode:
"""
Write code into database.
@@ -87,36 +86,6 @@ def write_dag(cls, dag_version: DagVersion, fileloc: str, session: Session = NEW
log.debug("DAG file %s written into DagCode table", fileloc)
return dag_code
- @classmethod
- @internal_api_call
- @provide_session
- def remove_deleted_code(
- cls,
- alive_dag_filelocs: Collection[str],
- processor_subdir: str,
- session: Session = NEW_SESSION,
- ) -> None:
- """
- Delete code not included in alive_dag_filelocs.
-
- :param alive_dag_filelocs: file paths of alive DAGs
- :param processor_subdir: dag processor subdir
- :param session: ORM Session
- """
- alive_fileloc_hashes = [cls.dag_fileloc_hash(fileloc) for fileloc in alive_dag_filelocs]
-
- log.debug("Deleting code from %s table ", cls.__tablename__)
-
- session.execute(
- delete(cls)
- .where(
- cls.fileloc_hash.notin_(alive_fileloc_hashes),
- cls.fileloc.notin_(alive_dag_filelocs),
- cls.fileloc.contains(processor_subdir),
- )
- .execution_options(synchronize_session="fetch")
- )
-
@classmethod
@provide_session
def has_dag(cls, fileloc: str, session: Session = NEW_SESSION) -> bool:
@@ -172,7 +141,7 @@ def _get_code_from_db(cls, fileloc, session: Session = NEW_SESSION) -> str:
dag_code = session.scalar(
select(cls)
.where(cls.fileloc_hash == cls.dag_fileloc_hash(fileloc))
- .order_by(cls.last_updated.desc())
+ .order_by(cls.created_at.desc())
.limit(1)
)
if not dag_code:
diff --git a/airflow/models/serialized_dag.py b/airflow/models/serialized_dag.py
index 0d5667cd48fc..37c3478c8632 100644
--- a/airflow/models/serialized_dag.py
+++ b/airflow/models/serialized_dag.py
@@ -22,11 +22,11 @@
import logging
import zlib
from datetime import timedelta
-from typing import TYPE_CHECKING, Any, Collection
+from typing import TYPE_CHECKING, Any
import sqlalchemy_jsonfield
import uuid6
-from sqlalchemy import Column, ForeignKey, LargeBinary, String, exc, or_, select
+from sqlalchemy import Column, ForeignKey, LargeBinary, String, exc, select
from sqlalchemy.orm import backref, foreign, relationship
from sqlalchemy.sql.expression import func, literal
from sqlalchemy_utils import UUIDType
@@ -83,7 +83,7 @@ class SerializedDagModel(Base):
dag_id = Column(String(ID_LEN), nullable=False)
_data = Column("data", sqlalchemy_jsonfield.JSONField(json=json), nullable=True)
_data_compressed = Column("data_compressed", LargeBinary, nullable=True)
- last_updated = Column(UtcDateTime, nullable=False)
+ created_at = Column(UtcDateTime, nullable=False, default=timezone.utcnow)
dag_hash = Column(String(32), nullable=False)
processor_subdir = Column(String(2000), nullable=True)
@@ -110,7 +110,6 @@ class SerializedDagModel(Base):
def __init__(self, dag: DAG, processor_subdir: str | None = None) -> None:
self.dag_id = dag.dag_id
- self.last_updated = timezone.utcnow()
self.processor_subdir = processor_subdir
dag_data = SerializedDAG.to_dict(dag)
@@ -186,7 +185,7 @@ def write_dag(
if session.scalar(
select(literal(True)).where(
cls.dag_id == dag.dag_id,
- (timezone.utcnow() - timedelta(seconds=min_update_interval)) < cls.last_updated,
+ (timezone.utcnow() - timedelta(seconds=min_update_interval)) < cls.created_at,
)
):
return False
@@ -196,7 +195,7 @@ def write_dag(
serialized_dag_db = session.execute(
select(cls.dag_hash, cls.processor_subdir)
.where(cls.dag_id == dag.dag_id)
- .order_by(cls.last_updated.desc())
+ .order_by(cls.created_at.desc())
).first()
if (
@@ -215,13 +214,12 @@ def write_dag(
new_serialized_dag.dag_version = dagv
session.add(new_serialized_dag)
log.debug("DAG: %s written to the DB", dag.dag_id)
-
- DagCode.write_dag(dagv, dag.fileloc, session=session)
+ DagCode.write_code(dagv, dag.fileloc, session=session)
return True
@classmethod
def latest_item_select_object(cls, dag_id):
- return select(cls).where(cls.dag_id == dag_id).order_by(cls.last_updated.desc()).limit(1)
+ return select(cls).where(cls.dag_id == dag_id).order_by(cls.created_at.desc()).limit(1)
@classmethod
@provide_session
@@ -237,7 +235,7 @@ def get_latest_serialized_dags(
"""
# Subquery to get the latest serdag per dag_id
latest_serdag_subquery = (
- session.query(cls.dag_id, func.max(cls.last_updated).label("last_updated"))
+ session.query(cls.dag_id, func.max(cls.created_at).label("created_at"))
.filter(cls.dag_id.in_(dag_ids))
.group_by(cls.dag_id)
.subquery()
@@ -246,7 +244,7 @@ def get_latest_serialized_dags(
select(cls)
.join(
latest_serdag_subquery,
- cls.last_updated == latest_serdag_subquery.c.last_updated,
+ cls.created_at == latest_serdag_subquery.c.created_at,
)
.where(cls.dag_id.in_(dag_ids))
).all()
@@ -262,7 +260,7 @@ def read_all_dags(cls, session: Session = NEW_SESSION) -> dict[str, SerializedDA
:returns: a dict of DAGs read from database
"""
latest_serialized_dag_subquery = (
- session.query(cls.dag_id, func.max(cls.last_updated).label("max_updated"))
+ session.query(cls.dag_id, func.max(cls.created_at).label("max_created"))
.group_by(cls.dag_id)
.subquery()
)
@@ -270,7 +268,7 @@ def read_all_dags(cls, session: Session = NEW_SESSION) -> dict[str, SerializedDA
select(cls).join(
latest_serialized_dag_subquery,
(cls.dag_id == latest_serialized_dag_subquery.c.dag_id)
- and (cls.last_updated == latest_serialized_dag_subquery.c.max_updated),
+ and (cls.created_at == latest_serialized_dag_subquery.c.max_created),
)
)
@@ -313,47 +311,6 @@ def dag(self) -> SerializedDAG:
raise ValueError("invalid or missing serialized DAG data")
return SerializedDAG.from_dict(data)
- @classmethod
- @provide_session
- def remove_dag(cls, dag_id: str, session: Session = NEW_SESSION) -> None:
- """
- Delete a DAG with given dag_id.
-
- :param dag_id: dag_id to be deleted
- :param session: ORM Session.
- """
- session.execute(cls.__table__.delete().where(cls.dag_id == dag_id))
-
- @classmethod
- @internal_api_call
- @provide_session
- def remove_deleted_dags(
- cls,
- alive_dag_filelocs: Collection[str],
- processor_subdir: str | None = None,
- session: Session = NEW_SESSION,
- ) -> None:
- """
- Delete DAGs not included in alive_dag_filelocs.
-
- :param alive_dag_filelocs: file paths of alive DAGs
- :param processor_subdir: dag processor subdir
- :param session: ORM Session
- """
- log.debug(
- "Deleting Serialized DAGs (for which DAG files are deleted) from %s table ", cls.__tablename__
- )
- # Deleting the DagModel cascade deletes the serialized Dag through the dag version relationship
- session.execute(
- DagModel.__table__.delete().where(
- DagModel.fileloc.notin_(alive_dag_filelocs),
- or_(
- DagModel.processor_subdir.is_(None),
- DagModel.processor_subdir == processor_subdir,
- ),
- )
- )
-
@classmethod
@provide_session
def has_dag(cls, dag_id: str, session: Session = NEW_SESSION) -> bool:
@@ -418,7 +375,7 @@ def get_last_updated_datetime(cls, dag_id: str, session: Session = NEW_SESSION)
:param session: ORM Session
"""
return session.scalar(
- select(cls.last_updated).where(cls.dag_id == dag_id).order_by(cls.last_updated.desc()).limit(1)
+ select(cls.created_at).where(cls.dag_id == dag_id).order_by(cls.created_at.desc()).limit(1)
)
@classmethod
@@ -429,7 +386,7 @@ def get_max_last_updated_datetime(cls, session: Session = NEW_SESSION) -> dateti
:param session: ORM Session
"""
- return session.scalar(select(func.max(cls.last_updated)))
+ return session.scalar(select(func.max(cls.created_at)))
@classmethod
@provide_session
@@ -442,7 +399,7 @@ def get_latest_version_hash(cls, dag_id: str, session: Session = NEW_SESSION) ->
:return: DAG Hash, or None if the DAG is not found
"""
return session.scalar(
- select(cls.dag_hash).where(cls.dag_id == dag_id).order_by(cls.last_updated.desc()).limit(1)
+ select(cls.dag_hash).where(cls.dag_id == dag_id).order_by(cls.created_at.desc()).limit(1)
)
@classmethod
@@ -461,9 +418,9 @@ def get_latest_version_hash_and_updated_datetime(
:return: A tuple of DAG Hash and last updated datetime, or None if the DAG is not found
"""
return session.execute(
- select(cls.dag_hash, cls.last_updated)
+ select(cls.dag_hash, cls.created_at)
.where(cls.dag_id == dag_id)
- .order_by(cls.last_updated.desc())
+ .order_by(cls.created_at.desc())
.limit(1)
).one_or_none()
@@ -476,7 +433,7 @@ def get_dag_dependencies(cls, session: Session = NEW_SESSION) -> dict[str, list[
:param session: ORM Session
"""
latest_sdag_subquery = (
- session.query(cls.dag_id, func.max(cls.last_updated).label("max_updated"))
+ session.query(cls.dag_id, func.max(cls.created_at).label("max_created"))
.group_by(cls.dag_id)
.subquery()
)
@@ -485,7 +442,7 @@ def get_dag_dependencies(cls, session: Session = NEW_SESSION) -> dict[str, list[
select(cls.dag_id, func.json_extract(cls._data, "$.dag.dag_dependencies")).join(
latest_sdag_subquery,
(cls.dag_id == latest_sdag_subquery.c.dag_id)
- and (cls.last_updated == latest_sdag_subquery.c.max_updated),
+ and (cls.created_at == latest_sdag_subquery.c.max_created),
)
)
iterator = ((dag_id, json.loads(deps_data) if deps_data else []) for dag_id, deps_data in query)
@@ -494,7 +451,7 @@ def get_dag_dependencies(cls, session: Session = NEW_SESSION) -> dict[str, list[
select(cls.dag_id, func.json_extract_path(cls._data, "dag", "dag_dependencies")).join(
latest_sdag_subquery,
(cls.dag_id == latest_sdag_subquery.c.dag_id)
- and (cls.last_updated == latest_sdag_subquery.c.max_updated),
+ and (cls.created_at == latest_sdag_subquery.c.max_created),
)
)
return {dag_id: [DagDependency(**d) for d in (deps_data or [])] for dag_id, deps_data in iterator}
diff --git a/docs/apache-airflow/img/airflow_erd.sha256 b/docs/apache-airflow/img/airflow_erd.sha256
index 8960e963a752..349201964219 100644
--- a/docs/apache-airflow/img/airflow_erd.sha256
+++ b/docs/apache-airflow/img/airflow_erd.sha256
@@ -1 +1 @@
-2f5a0c99e82990e669d975bf366dc82202c5d460063aaebff3e10abd2acc15f9
\ No newline at end of file
+1bf2f91dc87440cdbf7d1defb10a4693b47280b9a9b914dae3f1866eb2169629
\ No newline at end of file
diff --git a/docs/apache-airflow/img/airflow_erd.svg b/docs/apache-airflow/img/airflow_erd.svg
index 26858f7a5400..41fe37adaaab 100644
--- a/docs/apache-airflow/img/airflow_erd.svg
+++ b/docs/apache-airflow/img/airflow_erd.svg
@@ -1539,6 +1539,7 @@
created_at
[TIMESTAMP]
+ NOT NULL
dag_id
@@ -1758,94 +1759,94 @@
dag_code
-
-dag_code
-
-id
-
- [UUID]
- NOT NULL
-
-dag_version_id
-
- [UUID]
- NOT NULL
-
-fileloc
-
- [VARCHAR(2000)]
- NOT NULL
-
-fileloc_hash
-
- [BIGINT]
- NOT NULL
-
-last_updated
-
- [TIMESTAMP]
- NOT NULL
-
-source_code
-
- [TEXT]
- NOT NULL
+
+dag_code
+
+id
+
+ [UUID]
+ NOT NULL
+
+created_at
+
+ [TIMESTAMP]
+ NOT NULL
+
+dag_version_id
+
+ [UUID]
+ NOT NULL
+
+fileloc
+
+ [VARCHAR(2000)]
+ NOT NULL
+
+fileloc_hash
+
+ [BIGINT]
+ NOT NULL
+
+source_code
+
+ [TEXT]
+ NOT NULL
dag_version--dag_code
-
-0..N
+
+0..N
1
serialized_dag
-
-serialized_dag
-
-id
-
- [UUID]
- NOT NULL
-
-dag_hash
-
- [VARCHAR(32)]
- NOT NULL
-
-dag_id
-
- [VARCHAR(250)]
- NOT NULL
-
-dag_version_id
-
- [UUID]
- NOT NULL
-
-data
-
- [JSON]
-
-data_compressed
-
- [BYTEA]
-
-last_updated
-
- [TIMESTAMP]
- NOT NULL
-
-processor_subdir
-
- [VARCHAR(2000)]
+
+serialized_dag
+
+id
+
+ [UUID]
+ NOT NULL
+
+created_at
+
+ [TIMESTAMP]
+ NOT NULL
+
+dag_hash
+
+ [VARCHAR(32)]
+ NOT NULL
+
+dag_id
+
+ [VARCHAR(250)]
+ NOT NULL
+
+dag_version_id
+
+ [UUID]
+ NOT NULL
+
+data
+
+ [JSON]
+
+data_compressed
+
+ [BYTEA]
+
+processor_subdir
+
+ [VARCHAR(2000)]
dag_version--serialized_dag
-
-0..N
+
+0..N
1
diff --git a/tests/api_internal/endpoints/test_rpc_api_endpoint.py b/tests/api_internal/endpoints/test_rpc_api_endpoint.py
index 5813b28790db..6f7b151aebff 100644
--- a/tests/api_internal/endpoints/test_rpc_api_endpoint.py
+++ b/tests/api_internal/endpoints/test_rpc_api_endpoint.py
@@ -101,7 +101,7 @@ def test_initialize_method_map(self):
from airflow.api_internal.endpoints.rpc_api_endpoint import initialize_method_map
method_map = initialize_method_map()
- assert len(method_map) > 70
+ assert len(method_map) > 69
@pytest.mark.parametrize(
"input_params, method_result, result_cmp_func, method_params",
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 8c9096c6849e..0aa318b52ead 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -172,10 +172,7 @@ def set_instance_attrs(self) -> Generator:
# Speed up some tests by not running the tasks, just look at what we
# enqueue!
self.null_exec: MockExecutor | None = MockExecutor()
- # Since we don't want to store the code for the DAG defined in this file
- with patch("airflow.models.serialized_dag.SerializedDagModel.remove_deleted_dags"):
- yield
-
+ yield
self.null_exec = None
@pytest.fixture
diff --git a/tests/models/test_dagcode.py b/tests/models/test_dagcode.py
index fd7d761f9103..fc1dfa330f7e 100644
--- a/tests/models/test_dagcode.py
+++ b/tests/models/test_dagcode.py
@@ -95,7 +95,7 @@ def _compare_example_dags(self, example_dags):
session.query(DagCode.fileloc, DagCode.fileloc_hash, DagCode.source_code)
.filter(DagCode.fileloc == dag.fileloc)
.filter(DagCode.fileloc_hash == dag_fileloc_hash)
- .order_by(DagCode.last_updated.desc())
+ .order_by(DagCode.created_at.desc())
.limit(1)
.one()
)
@@ -129,7 +129,7 @@ def test_db_code_created_on_serdag_change(self, session):
result = (
session.query(DagCode)
.filter(DagCode.fileloc == example_dag.fileloc)
- .order_by(DagCode.last_updated.desc())
+ .order_by(DagCode.created_at.desc())
.limit(1)
.one()
)
@@ -146,14 +146,14 @@ def test_db_code_created_on_serdag_change(self, session):
new_result = (
session.query(DagCode)
.filter(DagCode.fileloc == example_dag.fileloc)
- .order_by(DagCode.last_updated.desc())
+ .order_by(DagCode.created_at.desc())
.limit(1)
.one()
)
assert new_result.fileloc == example_dag.fileloc
assert new_result.source_code != result.source_code
- assert new_result.last_updated > result.last_updated
+ assert new_result.created_at > result.created_at
def test_has_dag(self, dag_maker):
"""Test has_dag method."""
diff --git a/tests/models/test_serialized_dag.py b/tests/models/test_serialized_dag.py
index d0bfe37a69cc..599cb1396d71 100644
--- a/tests/models/test_serialized_dag.py
+++ b/tests/models/test_serialized_dag.py
@@ -109,7 +109,7 @@ def test_serialized_dag_is_updated_if_dag_is_changed(self):
s_dag_1 = SDM.get(example_bash_op_dag.dag_id)
assert s_dag_1.dag_hash == s_dag.dag_hash
- assert s_dag.last_updated == s_dag_1.last_updated
+ assert s_dag.created_at == s_dag_1.created_at
assert dag_updated is False
# Update DAG
@@ -119,7 +119,7 @@ def test_serialized_dag_is_updated_if_dag_is_changed(self):
dag_updated = SDM.write_dag(dag=example_bash_op_dag)
s_dag_2 = SDM.get(example_bash_op_dag.dag_id)
- assert s_dag.last_updated != s_dag_2.last_updated
+ assert s_dag.created_at != s_dag_2.created_at
assert s_dag.dag_hash != s_dag_2.dag_hash
assert s_dag_2.data["dag"]["tags"] == ["example", "example2", "new_tag"]
assert dag_updated is True
@@ -141,7 +141,7 @@ def test_serialized_dag_is_updated_if_processor_subdir_changed(self):
s_dag_1 = SDM.get(example_bash_op_dag.dag_id)
assert s_dag_1.dag_hash == s_dag.dag_hash
- assert s_dag.last_updated == s_dag_1.last_updated
+ assert s_dag.created_at == s_dag_1.created_at
assert dag_updated is False
session.flush()
@@ -177,27 +177,6 @@ def test_read_all_dags_only_picks_the_latest_serdags(self, session):
# assert only the latest SDM is returned
assert len(sdags) != len(serialized_dags2)
- @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
- def test_remove_dags_by_id(self):
- """DAGs can be removed from database."""
- example_dags_list = list(self._write_example_dags().values())
- # Tests removing by dag_id.
- dag_removed_by_id = example_dags_list[0]
- SDM.remove_dag(dag_removed_by_id.dag_id)
- assert not SDM.has_dag(dag_removed_by_id.dag_id)
-
- @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
- def test_remove_dags_by_filepath(self):
- """DAGs can be removed from database."""
- example_dags_list = list(self._write_example_dags().values())
- # Tests removing by file path.
- dag_removed_by_file = example_dags_list[0]
- # remove repeated files for those DAGs that define multiple dags in the same file (set comprehension)
- example_dag_files = list({dag.fileloc for dag in example_dags_list})
- example_dag_files.remove(dag_removed_by_file.fileloc)
- SDM.remove_deleted_dags(example_dag_files, processor_subdir="/tmp/test")
- assert not SDM.has_dag(dag_removed_by_file.dag_id)
-
@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
def test_bulk_sync_to_db(self):
dags = [