From 2db0d11c1568b2db39c8ff40c44a0a295419fa22 Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Sat, 16 Nov 2024 06:28:17 +0100 Subject: [PATCH] Refactor SerializedDagModel and DagCode for dag versioning (#43821) * Refactor SerializedDagModel and DagCode for dag versioning Now that we have dag versioning, the SerializedDagModel and DagCode objects should no longer be deleted. Deletion should start with the DagModel, which will cascade to the DagVersion, then to the DagCode and SerializedDagModel. Also, these models are no longer updated. Instead, a new object is added; hence, the last_updated is changed to created_at. * fixup! Refactor SerializedDagModel and DagCode for dag versioning * fixup! fixup! Refactor SerializedDagModel and DagCode for dag versioning * update rpc test since remove_deleted_dag has been removed * Apply suggestions from code review --- airflow/api/common/delete_dag.py | 6 - .../endpoints/rpc_api_endpoint.py | 3 - airflow/dag_processing/manager.py | 6 +- .../versions/0047_3_0_0_add_dag_versioning.py | 8 + airflow/models/dag_version.py | 2 +- airflow/models/dagcode.py | 41 +---- airflow/models/serialized_dag.py | 81 +++------- docs/apache-airflow/img/airflow_erd.sha256 | 2 +- docs/apache-airflow/img/airflow_erd.svg | 151 +++++++++--------- .../endpoints/test_rpc_api_endpoint.py | 2 +- tests/jobs/test_scheduler_job.py | 5 +- tests/models/test_dagcode.py | 8 +- tests/models/test_serialized_dag.py | 27 +--- 13 files changed, 120 insertions(+), 222 deletions(-) diff --git a/airflow/api/common/delete_dag.py b/airflow/api/common/delete_dag.py index 11b046648c5a..1e9d4a80dbd6 100644 --- a/airflow/api/common/delete_dag.py +++ b/airflow/api/common/delete_dag.py @@ -28,7 +28,6 @@ from airflow.exceptions import AirflowException, DagNotFound from airflow.models import DagModel from airflow.models.errors import ParseImportError -from airflow.models.serialized_dag import SerializedDagModel from airflow.utils.db import get_sqla_model_classes from airflow.utils.session import NEW_SESSION, provide_session from airflow.utils.state import TaskInstanceState @@ -64,11 +63,6 @@ def delete_dag(dag_id: str, keep_records_in_log: bool = True, session: Session = if dag is None: raise DagNotFound(f"Dag id {dag_id} not found") - # Scheduler removes DAGs without files from serialized_dag table every dag_dir_list_interval. - # There may be a lag, so explicitly removes serialized DAG here. - if SerializedDagModel.has_dag(dag_id=dag_id, session=session): - SerializedDagModel.remove_dag(dag_id=dag_id, session=session) - count = 0 for model in get_sqla_model_classes(): diff --git a/airflow/api_internal/endpoints/rpc_api_endpoint.py b/airflow/api_internal/endpoints/rpc_api_endpoint.py index 1cdd2536e135..ea8d59e29ad3 100644 --- a/airflow/api_internal/endpoints/rpc_api_endpoint.py +++ b/airflow/api_internal/endpoints/rpc_api_endpoint.py @@ -37,7 +37,6 @@ from airflow.configuration import conf from airflow.exceptions import AirflowException from airflow.jobs.job import Job, most_recent_job -from airflow.models.dagcode import DagCode from airflow.models.taskinstance import _record_task_map_for_downstreams from airflow.models.xcom_arg import _get_task_map_length from airflow.sensors.base import _orig_start_date @@ -94,7 +93,6 @@ def initialize_method_map() -> dict[str, Callable]: _xcom_pull, _record_task_map_for_downstreams, trigger_dag, - DagCode.remove_deleted_code, DagModel.deactivate_deleted_dags, DagModel.get_paused_dag_ids, DagModel.get_current, @@ -138,7 +136,6 @@ def initialize_method_map() -> dict[str, Callable]: DagRun._get_log_template, RenderedTaskInstanceFields._update_runtime_evaluated_template_fields, SerializedDagModel.get_serialized_dag, - SerializedDagModel.remove_deleted_dags, SkipMixin._skip, SkipMixin._skip_all_except, TaskInstance._check_and_change_state_before_execution, diff --git a/airflow/dag_processing/manager.py b/airflow/dag_processing/manager.py index 597b09311928..3056a7fc1f4d 100644 --- a/airflow/dag_processing/manager.py +++ b/airflow/dag_processing/manager.py @@ -507,11 +507,7 @@ def deactivate_stale_dags( stale_dag_threshold: int, session: Session = NEW_SESSION, ): - """ - Detect DAGs which are no longer present in files. - - Deactivate them and remove them in the serialized_dag table. - """ + """Detect and deactivate DAGs which are no longer present in files.""" to_deactivate = set() query = select(DagModel.dag_id, DagModel.fileloc, DagModel.last_parsed_time).where(DagModel.is_active) standalone_dag_processor = conf.getboolean("scheduler", "standalone_dag_processor") diff --git a/airflow/migrations/versions/0047_3_0_0_add_dag_versioning.py b/airflow/migrations/versions/0047_3_0_0_add_dag_versioning.py index ca685ae4e071..5d74507d6020 100644 --- a/airflow/migrations/versions/0047_3_0_0_add_dag_versioning.py +++ b/airflow/migrations/versions/0047_3_0_0_add_dag_versioning.py @@ -81,6 +81,8 @@ def upgrade(): ondelete="CASCADE", ) batch_op.create_unique_constraint("dag_code_dag_version_id_uq", ["dag_version_id"]) + batch_op.drop_column("last_updated") + batch_op.add_column(sa.Column("created_at", UtcDateTime(), nullable=False, default=timezone.utcnow)) with op.batch_alter_table( "serialized_dag", recreate="always", naming_convention=naming_convention @@ -100,6 +102,8 @@ def upgrade(): ondelete="CASCADE", ) batch_op.create_unique_constraint("serialized_dag_dag_version_id_uq", ["dag_version_id"]) + batch_op.drop_column("last_updated") + batch_op.add_column(sa.Column("created_at", UtcDateTime(), nullable=False, default=timezone.utcnow)) with op.batch_alter_table("task_instance", schema=None) as batch_op: batch_op.add_column(sa.Column("dag_version_id", UUIDType(binary=False))) @@ -140,6 +144,8 @@ def downgrade(): batch_op.drop_constraint(batch_op.f("dag_code_dag_version_id_fkey"), type_="foreignkey") batch_op.drop_column("dag_version_id") batch_op.create_primary_key("dag_code_pkey", ["fileloc_hash"]) + batch_op.drop_column("created_at") + batch_op.add_column(sa.Column("last_updated", UtcDateTime(), nullable=False)) with op.batch_alter_table("serialized_dag", schema=None, naming_convention=naming_convention) as batch_op: batch_op.drop_column("id") @@ -149,6 +155,8 @@ def downgrade(): batch_op.create_primary_key("serialized_dag_pkey", ["dag_id"]) batch_op.drop_constraint(batch_op.f("serialized_dag_dag_version_id_fkey"), type_="foreignkey") batch_op.drop_column("dag_version_id") + batch_op.drop_column("created_at") + batch_op.add_column(sa.Column("last_updated", UtcDateTime(), nullable=False)) with op.batch_alter_table("dag_run", schema=None) as batch_op: batch_op.add_column(sa.Column("dag_hash", sa.String(length=32), autoincrement=False, nullable=True)) diff --git a/airflow/models/dag_version.py b/airflow/models/dag_version.py index a0ce924caa8b..4bbd171b3f96 100644 --- a/airflow/models/dag_version.py +++ b/airflow/models/dag_version.py @@ -62,7 +62,7 @@ class DagVersion(Base): ) dag_runs = relationship("DagRun", back_populates="dag_version", cascade="all, delete, delete-orphan") task_instances = relationship("TaskInstance", back_populates="dag_version") - created_at = Column(UtcDateTime, default=timezone.utcnow) + created_at = Column(UtcDateTime, nullable=False, default=timezone.utcnow) __table_args__ = ( UniqueConstraint("dag_id", "version_number", name="dag_id_v_name_v_number_unique_constraint"), diff --git a/airflow/models/dagcode.py b/airflow/models/dagcode.py index c78f6cafaa6f..c78457e6059f 100644 --- a/airflow/models/dagcode.py +++ b/airflow/models/dagcode.py @@ -18,16 +18,15 @@ import logging import struct -from typing import TYPE_CHECKING, Collection +from typing import TYPE_CHECKING import uuid6 -from sqlalchemy import BigInteger, Column, ForeignKey, String, Text, delete, select +from sqlalchemy import BigInteger, Column, ForeignKey, String, Text, select from sqlalchemy.dialects.mysql import MEDIUMTEXT from sqlalchemy.orm import relationship from sqlalchemy.sql.expression import literal from sqlalchemy_utils import UUIDType -from airflow.api_internal.internal_api_call import internal_api_call from airflow.configuration import conf from airflow.exceptions import DagCodeNotFound from airflow.models.base import Base @@ -58,7 +57,7 @@ class DagCode(Base): fileloc_hash = Column(BigInteger, nullable=False) fileloc = Column(String(2000), nullable=False) # The max length of fileloc exceeds the limit of indexing. - last_updated = Column(UtcDateTime, nullable=False) + created_at = Column(UtcDateTime, nullable=False, default=timezone.utcnow) source_code = Column(Text().with_variant(MEDIUMTEXT(), "mysql"), nullable=False) dag_version_id = Column( UUIDType(binary=False), ForeignKey("dag_version.id", ondelete="CASCADE"), nullable=False, unique=True @@ -74,7 +73,7 @@ def __init__(self, dag_version, full_filepath: str, source_code: str | None = No @classmethod @provide_session - def write_dag(cls, dag_version: DagVersion, fileloc: str, session: Session = NEW_SESSION) -> DagCode: + def write_code(cls, dag_version: DagVersion, fileloc: str, session: Session = NEW_SESSION) -> DagCode: """ Write code into database. @@ -87,36 +86,6 @@ def write_dag(cls, dag_version: DagVersion, fileloc: str, session: Session = NEW log.debug("DAG file %s written into DagCode table", fileloc) return dag_code - @classmethod - @internal_api_call - @provide_session - def remove_deleted_code( - cls, - alive_dag_filelocs: Collection[str], - processor_subdir: str, - session: Session = NEW_SESSION, - ) -> None: - """ - Delete code not included in alive_dag_filelocs. - - :param alive_dag_filelocs: file paths of alive DAGs - :param processor_subdir: dag processor subdir - :param session: ORM Session - """ - alive_fileloc_hashes = [cls.dag_fileloc_hash(fileloc) for fileloc in alive_dag_filelocs] - - log.debug("Deleting code from %s table ", cls.__tablename__) - - session.execute( - delete(cls) - .where( - cls.fileloc_hash.notin_(alive_fileloc_hashes), - cls.fileloc.notin_(alive_dag_filelocs), - cls.fileloc.contains(processor_subdir), - ) - .execution_options(synchronize_session="fetch") - ) - @classmethod @provide_session def has_dag(cls, fileloc: str, session: Session = NEW_SESSION) -> bool: @@ -172,7 +141,7 @@ def _get_code_from_db(cls, fileloc, session: Session = NEW_SESSION) -> str: dag_code = session.scalar( select(cls) .where(cls.fileloc_hash == cls.dag_fileloc_hash(fileloc)) - .order_by(cls.last_updated.desc()) + .order_by(cls.created_at.desc()) .limit(1) ) if not dag_code: diff --git a/airflow/models/serialized_dag.py b/airflow/models/serialized_dag.py index 0d5667cd48fc..37c3478c8632 100644 --- a/airflow/models/serialized_dag.py +++ b/airflow/models/serialized_dag.py @@ -22,11 +22,11 @@ import logging import zlib from datetime import timedelta -from typing import TYPE_CHECKING, Any, Collection +from typing import TYPE_CHECKING, Any import sqlalchemy_jsonfield import uuid6 -from sqlalchemy import Column, ForeignKey, LargeBinary, String, exc, or_, select +from sqlalchemy import Column, ForeignKey, LargeBinary, String, exc, select from sqlalchemy.orm import backref, foreign, relationship from sqlalchemy.sql.expression import func, literal from sqlalchemy_utils import UUIDType @@ -83,7 +83,7 @@ class SerializedDagModel(Base): dag_id = Column(String(ID_LEN), nullable=False) _data = Column("data", sqlalchemy_jsonfield.JSONField(json=json), nullable=True) _data_compressed = Column("data_compressed", LargeBinary, nullable=True) - last_updated = Column(UtcDateTime, nullable=False) + created_at = Column(UtcDateTime, nullable=False, default=timezone.utcnow) dag_hash = Column(String(32), nullable=False) processor_subdir = Column(String(2000), nullable=True) @@ -110,7 +110,6 @@ class SerializedDagModel(Base): def __init__(self, dag: DAG, processor_subdir: str | None = None) -> None: self.dag_id = dag.dag_id - self.last_updated = timezone.utcnow() self.processor_subdir = processor_subdir dag_data = SerializedDAG.to_dict(dag) @@ -186,7 +185,7 @@ def write_dag( if session.scalar( select(literal(True)).where( cls.dag_id == dag.dag_id, - (timezone.utcnow() - timedelta(seconds=min_update_interval)) < cls.last_updated, + (timezone.utcnow() - timedelta(seconds=min_update_interval)) < cls.created_at, ) ): return False @@ -196,7 +195,7 @@ def write_dag( serialized_dag_db = session.execute( select(cls.dag_hash, cls.processor_subdir) .where(cls.dag_id == dag.dag_id) - .order_by(cls.last_updated.desc()) + .order_by(cls.created_at.desc()) ).first() if ( @@ -215,13 +214,12 @@ def write_dag( new_serialized_dag.dag_version = dagv session.add(new_serialized_dag) log.debug("DAG: %s written to the DB", dag.dag_id) - - DagCode.write_dag(dagv, dag.fileloc, session=session) + DagCode.write_code(dagv, dag.fileloc, session=session) return True @classmethod def latest_item_select_object(cls, dag_id): - return select(cls).where(cls.dag_id == dag_id).order_by(cls.last_updated.desc()).limit(1) + return select(cls).where(cls.dag_id == dag_id).order_by(cls.created_at.desc()).limit(1) @classmethod @provide_session @@ -237,7 +235,7 @@ def get_latest_serialized_dags( """ # Subquery to get the latest serdag per dag_id latest_serdag_subquery = ( - session.query(cls.dag_id, func.max(cls.last_updated).label("last_updated")) + session.query(cls.dag_id, func.max(cls.created_at).label("created_at")) .filter(cls.dag_id.in_(dag_ids)) .group_by(cls.dag_id) .subquery() @@ -246,7 +244,7 @@ def get_latest_serialized_dags( select(cls) .join( latest_serdag_subquery, - cls.last_updated == latest_serdag_subquery.c.last_updated, + cls.created_at == latest_serdag_subquery.c.created_at, ) .where(cls.dag_id.in_(dag_ids)) ).all() @@ -262,7 +260,7 @@ def read_all_dags(cls, session: Session = NEW_SESSION) -> dict[str, SerializedDA :returns: a dict of DAGs read from database """ latest_serialized_dag_subquery = ( - session.query(cls.dag_id, func.max(cls.last_updated).label("max_updated")) + session.query(cls.dag_id, func.max(cls.created_at).label("max_created")) .group_by(cls.dag_id) .subquery() ) @@ -270,7 +268,7 @@ def read_all_dags(cls, session: Session = NEW_SESSION) -> dict[str, SerializedDA select(cls).join( latest_serialized_dag_subquery, (cls.dag_id == latest_serialized_dag_subquery.c.dag_id) - and (cls.last_updated == latest_serialized_dag_subquery.c.max_updated), + and (cls.created_at == latest_serialized_dag_subquery.c.max_created), ) ) @@ -313,47 +311,6 @@ def dag(self) -> SerializedDAG: raise ValueError("invalid or missing serialized DAG data") return SerializedDAG.from_dict(data) - @classmethod - @provide_session - def remove_dag(cls, dag_id: str, session: Session = NEW_SESSION) -> None: - """ - Delete a DAG with given dag_id. - - :param dag_id: dag_id to be deleted - :param session: ORM Session. - """ - session.execute(cls.__table__.delete().where(cls.dag_id == dag_id)) - - @classmethod - @internal_api_call - @provide_session - def remove_deleted_dags( - cls, - alive_dag_filelocs: Collection[str], - processor_subdir: str | None = None, - session: Session = NEW_SESSION, - ) -> None: - """ - Delete DAGs not included in alive_dag_filelocs. - - :param alive_dag_filelocs: file paths of alive DAGs - :param processor_subdir: dag processor subdir - :param session: ORM Session - """ - log.debug( - "Deleting Serialized DAGs (for which DAG files are deleted) from %s table ", cls.__tablename__ - ) - # Deleting the DagModel cascade deletes the serialized Dag through the dag version relationship - session.execute( - DagModel.__table__.delete().where( - DagModel.fileloc.notin_(alive_dag_filelocs), - or_( - DagModel.processor_subdir.is_(None), - DagModel.processor_subdir == processor_subdir, - ), - ) - ) - @classmethod @provide_session def has_dag(cls, dag_id: str, session: Session = NEW_SESSION) -> bool: @@ -418,7 +375,7 @@ def get_last_updated_datetime(cls, dag_id: str, session: Session = NEW_SESSION) :param session: ORM Session """ return session.scalar( - select(cls.last_updated).where(cls.dag_id == dag_id).order_by(cls.last_updated.desc()).limit(1) + select(cls.created_at).where(cls.dag_id == dag_id).order_by(cls.created_at.desc()).limit(1) ) @classmethod @@ -429,7 +386,7 @@ def get_max_last_updated_datetime(cls, session: Session = NEW_SESSION) -> dateti :param session: ORM Session """ - return session.scalar(select(func.max(cls.last_updated))) + return session.scalar(select(func.max(cls.created_at))) @classmethod @provide_session @@ -442,7 +399,7 @@ def get_latest_version_hash(cls, dag_id: str, session: Session = NEW_SESSION) -> :return: DAG Hash, or None if the DAG is not found """ return session.scalar( - select(cls.dag_hash).where(cls.dag_id == dag_id).order_by(cls.last_updated.desc()).limit(1) + select(cls.dag_hash).where(cls.dag_id == dag_id).order_by(cls.created_at.desc()).limit(1) ) @classmethod @@ -461,9 +418,9 @@ def get_latest_version_hash_and_updated_datetime( :return: A tuple of DAG Hash and last updated datetime, or None if the DAG is not found """ return session.execute( - select(cls.dag_hash, cls.last_updated) + select(cls.dag_hash, cls.created_at) .where(cls.dag_id == dag_id) - .order_by(cls.last_updated.desc()) + .order_by(cls.created_at.desc()) .limit(1) ).one_or_none() @@ -476,7 +433,7 @@ def get_dag_dependencies(cls, session: Session = NEW_SESSION) -> dict[str, list[ :param session: ORM Session """ latest_sdag_subquery = ( - session.query(cls.dag_id, func.max(cls.last_updated).label("max_updated")) + session.query(cls.dag_id, func.max(cls.created_at).label("max_created")) .group_by(cls.dag_id) .subquery() ) @@ -485,7 +442,7 @@ def get_dag_dependencies(cls, session: Session = NEW_SESSION) -> dict[str, list[ select(cls.dag_id, func.json_extract(cls._data, "$.dag.dag_dependencies")).join( latest_sdag_subquery, (cls.dag_id == latest_sdag_subquery.c.dag_id) - and (cls.last_updated == latest_sdag_subquery.c.max_updated), + and (cls.created_at == latest_sdag_subquery.c.max_created), ) ) iterator = ((dag_id, json.loads(deps_data) if deps_data else []) for dag_id, deps_data in query) @@ -494,7 +451,7 @@ def get_dag_dependencies(cls, session: Session = NEW_SESSION) -> dict[str, list[ select(cls.dag_id, func.json_extract_path(cls._data, "dag", "dag_dependencies")).join( latest_sdag_subquery, (cls.dag_id == latest_sdag_subquery.c.dag_id) - and (cls.last_updated == latest_sdag_subquery.c.max_updated), + and (cls.created_at == latest_sdag_subquery.c.max_created), ) ) return {dag_id: [DagDependency(**d) for d in (deps_data or [])] for dag_id, deps_data in iterator} diff --git a/docs/apache-airflow/img/airflow_erd.sha256 b/docs/apache-airflow/img/airflow_erd.sha256 index 8960e963a752..349201964219 100644 --- a/docs/apache-airflow/img/airflow_erd.sha256 +++ b/docs/apache-airflow/img/airflow_erd.sha256 @@ -1 +1 @@ -2f5a0c99e82990e669d975bf366dc82202c5d460063aaebff3e10abd2acc15f9 \ No newline at end of file +1bf2f91dc87440cdbf7d1defb10a4693b47280b9a9b914dae3f1866eb2169629 \ No newline at end of file diff --git a/docs/apache-airflow/img/airflow_erd.svg b/docs/apache-airflow/img/airflow_erd.svg index 26858f7a5400..41fe37adaaab 100644 --- a/docs/apache-airflow/img/airflow_erd.svg +++ b/docs/apache-airflow/img/airflow_erd.svg @@ -1539,6 +1539,7 @@ created_at [TIMESTAMP] + NOT NULL dag_id @@ -1758,94 +1759,94 @@ dag_code - -dag_code - -id - - [UUID] - NOT NULL - -dag_version_id - - [UUID] - NOT NULL - -fileloc - - [VARCHAR(2000)] - NOT NULL - -fileloc_hash - - [BIGINT] - NOT NULL - -last_updated - - [TIMESTAMP] - NOT NULL - -source_code - - [TEXT] - NOT NULL + +dag_code + +id + + [UUID] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +dag_version_id + + [UUID] + NOT NULL + +fileloc + + [VARCHAR(2000)] + NOT NULL + +fileloc_hash + + [BIGINT] + NOT NULL + +source_code + + [TEXT] + NOT NULL dag_version--dag_code - -0..N + +0..N 1 serialized_dag - -serialized_dag - -id - - [UUID] - NOT NULL - -dag_hash - - [VARCHAR(32)] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -dag_version_id - - [UUID] - NOT NULL - -data - - [JSON] - -data_compressed - - [BYTEA] - -last_updated - - [TIMESTAMP] - NOT NULL - -processor_subdir - - [VARCHAR(2000)] + +serialized_dag + +id + + [UUID] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +dag_hash + + [VARCHAR(32)] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +dag_version_id + + [UUID] + NOT NULL + +data + + [JSON] + +data_compressed + + [BYTEA] + +processor_subdir + + [VARCHAR(2000)] dag_version--serialized_dag - -0..N + +0..N 1 diff --git a/tests/api_internal/endpoints/test_rpc_api_endpoint.py b/tests/api_internal/endpoints/test_rpc_api_endpoint.py index 5813b28790db..6f7b151aebff 100644 --- a/tests/api_internal/endpoints/test_rpc_api_endpoint.py +++ b/tests/api_internal/endpoints/test_rpc_api_endpoint.py @@ -101,7 +101,7 @@ def test_initialize_method_map(self): from airflow.api_internal.endpoints.rpc_api_endpoint import initialize_method_map method_map = initialize_method_map() - assert len(method_map) > 70 + assert len(method_map) > 69 @pytest.mark.parametrize( "input_params, method_result, result_cmp_func, method_params", diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index 8c9096c6849e..0aa318b52ead 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -172,10 +172,7 @@ def set_instance_attrs(self) -> Generator: # Speed up some tests by not running the tasks, just look at what we # enqueue! self.null_exec: MockExecutor | None = MockExecutor() - # Since we don't want to store the code for the DAG defined in this file - with patch("airflow.models.serialized_dag.SerializedDagModel.remove_deleted_dags"): - yield - + yield self.null_exec = None @pytest.fixture diff --git a/tests/models/test_dagcode.py b/tests/models/test_dagcode.py index fd7d761f9103..fc1dfa330f7e 100644 --- a/tests/models/test_dagcode.py +++ b/tests/models/test_dagcode.py @@ -95,7 +95,7 @@ def _compare_example_dags(self, example_dags): session.query(DagCode.fileloc, DagCode.fileloc_hash, DagCode.source_code) .filter(DagCode.fileloc == dag.fileloc) .filter(DagCode.fileloc_hash == dag_fileloc_hash) - .order_by(DagCode.last_updated.desc()) + .order_by(DagCode.created_at.desc()) .limit(1) .one() ) @@ -129,7 +129,7 @@ def test_db_code_created_on_serdag_change(self, session): result = ( session.query(DagCode) .filter(DagCode.fileloc == example_dag.fileloc) - .order_by(DagCode.last_updated.desc()) + .order_by(DagCode.created_at.desc()) .limit(1) .one() ) @@ -146,14 +146,14 @@ def test_db_code_created_on_serdag_change(self, session): new_result = ( session.query(DagCode) .filter(DagCode.fileloc == example_dag.fileloc) - .order_by(DagCode.last_updated.desc()) + .order_by(DagCode.created_at.desc()) .limit(1) .one() ) assert new_result.fileloc == example_dag.fileloc assert new_result.source_code != result.source_code - assert new_result.last_updated > result.last_updated + assert new_result.created_at > result.created_at def test_has_dag(self, dag_maker): """Test has_dag method.""" diff --git a/tests/models/test_serialized_dag.py b/tests/models/test_serialized_dag.py index d0bfe37a69cc..599cb1396d71 100644 --- a/tests/models/test_serialized_dag.py +++ b/tests/models/test_serialized_dag.py @@ -109,7 +109,7 @@ def test_serialized_dag_is_updated_if_dag_is_changed(self): s_dag_1 = SDM.get(example_bash_op_dag.dag_id) assert s_dag_1.dag_hash == s_dag.dag_hash - assert s_dag.last_updated == s_dag_1.last_updated + assert s_dag.created_at == s_dag_1.created_at assert dag_updated is False # Update DAG @@ -119,7 +119,7 @@ def test_serialized_dag_is_updated_if_dag_is_changed(self): dag_updated = SDM.write_dag(dag=example_bash_op_dag) s_dag_2 = SDM.get(example_bash_op_dag.dag_id) - assert s_dag.last_updated != s_dag_2.last_updated + assert s_dag.created_at != s_dag_2.created_at assert s_dag.dag_hash != s_dag_2.dag_hash assert s_dag_2.data["dag"]["tags"] == ["example", "example2", "new_tag"] assert dag_updated is True @@ -141,7 +141,7 @@ def test_serialized_dag_is_updated_if_processor_subdir_changed(self): s_dag_1 = SDM.get(example_bash_op_dag.dag_id) assert s_dag_1.dag_hash == s_dag.dag_hash - assert s_dag.last_updated == s_dag_1.last_updated + assert s_dag.created_at == s_dag_1.created_at assert dag_updated is False session.flush() @@ -177,27 +177,6 @@ def test_read_all_dags_only_picks_the_latest_serdags(self, session): # assert only the latest SDM is returned assert len(sdags) != len(serialized_dags2) - @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode - def test_remove_dags_by_id(self): - """DAGs can be removed from database.""" - example_dags_list = list(self._write_example_dags().values()) - # Tests removing by dag_id. - dag_removed_by_id = example_dags_list[0] - SDM.remove_dag(dag_removed_by_id.dag_id) - assert not SDM.has_dag(dag_removed_by_id.dag_id) - - @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode - def test_remove_dags_by_filepath(self): - """DAGs can be removed from database.""" - example_dags_list = list(self._write_example_dags().values()) - # Tests removing by file path. - dag_removed_by_file = example_dags_list[0] - # remove repeated files for those DAGs that define multiple dags in the same file (set comprehension) - example_dag_files = list({dag.fileloc for dag in example_dags_list}) - example_dag_files.remove(dag_removed_by_file.fileloc) - SDM.remove_deleted_dags(example_dag_files, processor_subdir="/tmp/test") - assert not SDM.has_dag(dag_removed_by_file.dag_id) - @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_bulk_sync_to_db(self): dags = [