Skip to content

Commit

Permalink
Revert "Handle Example dags case when checking for missing files (apa…
Browse files Browse the repository at this point in the history
…che#41856) (apache#41874)" (apache#42217)

This reverts commit 5e962ed.
  • Loading branch information
ephraimbuddy authored Sep 13, 2024
1 parent 2a76973 commit 103e65b
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 52 deletions.
11 changes: 3 additions & 8 deletions airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
from tabulate import tabulate

import airflow.models
from airflow import example_dags
from airflow.api_internal.internal_api_call import internal_api_call
from airflow.callbacks.callback_requests import CallbackRequest, SlaCallbackRequest
from airflow.configuration import conf
Expand Down Expand Up @@ -70,8 +69,6 @@
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.sqlalchemy import prohibit_commit, with_row_locks

example_dag_folder = next(iter(example_dags.__path__))

if TYPE_CHECKING:
from multiprocessing.connection import Connection as MultiprocessingConnection

Expand Down Expand Up @@ -530,11 +527,9 @@ def deactivate_stale_dags(

for dag in dags_parsed:
# When the DAG processor runs as part of the scheduler, and the user changes the DAGs folder,
# DAGs from the previous DAGs folder will be marked as stale. We also need to handle example dags
# differently. Note that this change has no impact on standalone DAG processors.
dag_not_in_current_dag_folder = (
not os.path.commonpath([dag.fileloc, example_dag_folder]) == example_dag_folder
) and (os.path.commonpath([dag.fileloc, dag_directory]) != dag_directory)
# DAGs from the previous DAGs folder will be marked as stale. Note that this change has no impact
# on standalone DAG processors.
dag_not_in_current_dag_folder = os.path.commonpath([dag.fileloc, dag_directory]) != dag_directory
# The largest valid difference between a DagFileStat's last_finished_time and a DAG's
# last_parsed_time is the processor_timeout. Longer than that indicates that the DAG is
# no longer present in the file. We have a stale_dag_threshold configured to prevent a
Expand Down
89 changes: 45 additions & 44 deletions tests/dag_processing/test_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -773,57 +773,58 @@ def test_scan_stale_dags_when_dag_folder_change(self):
def get_dag_string(filename) -> str:
return open(TEST_DAG_FOLDER / filename).read()

def add_dag_to_db(file_path, dag_id, processor_subdir):
dagbag = DagBag(file_path, read_dags_from_db=False)
dag = dagbag.get_dag(dag_id)
dag.fileloc = file_path
dag.last_parsed_time = timezone.utcnow()
dag.sync_to_db(processor_subdir=processor_subdir)
with tempfile.TemporaryDirectory() as tmpdir:
old_dag_home = tempfile.mkdtemp(dir=tmpdir)
old_dag_file = tempfile.NamedTemporaryFile(dir=old_dag_home, suffix=".py")
old_dag_file.write(get_dag_string("test_example_bash_operator.py").encode())
old_dag_file.flush()
new_dag_home = tempfile.mkdtemp(dir=tmpdir)
new_dag_file = tempfile.NamedTemporaryFile(dir=new_dag_home, suffix=".py")
new_dag_file.write(get_dag_string("test_scheduler_dags.py").encode())
new_dag_file.flush()

manager = DagProcessorJobRunner(
job=Job(),
processor=DagFileProcessorManager(
dag_directory=new_dag_home,
max_runs=1,
processor_timeout=timedelta(minutes=10),
signal_conn=MagicMock(),
dag_ids=[],
pickle_dags=False,
async_mode=True,
),
)

def create_dag_folder(dag_id):
dag_home = tempfile.mkdtemp(dir=tmpdir)
dag_file = tempfile.NamedTemporaryFile(dir=dag_home, suffix=".py")
dag_file.write(get_dag_string(dag_id).encode())
dag_file.flush()
return dag_home, dag_file
dagbag = DagBag(old_dag_file.name, read_dags_from_db=False)
other_dagbag = DagBag(new_dag_file.name, read_dags_from_db=False)

with tempfile.TemporaryDirectory() as tmpdir:
old_dag_home, old_dag_file = create_dag_folder("test_example_bash_operator.py")
new_dag_home, new_dag_file = create_dag_folder("test_scheduler_dags.py")
example_dag_home, example_dag_file = create_dag_folder("test_dag_warnings.py")

with mock.patch("airflow.dag_processing.manager.example_dag_folder", example_dag_home):
manager = DagProcessorJobRunner(
job=Job(),
processor=DagFileProcessorManager(
dag_directory=new_dag_home,
max_runs=1,
processor_timeout=timedelta(minutes=10),
signal_conn=MagicMock(),
dag_ids=[],
pickle_dags=False,
async_mode=True,
),
)
with create_session() as session:
# Add DAG from old dah home to the DB
dag = dagbag.get_dag("test_example_bash_operator")
dag.fileloc = old_dag_file.name
dag.last_parsed_time = timezone.utcnow()
dag.sync_to_db(processor_subdir=old_dag_home)

with create_session() as session:
add_dag_to_db(old_dag_file.name, "test_example_bash_operator", old_dag_home)
add_dag_to_db(new_dag_file.name, "test_start_date_scheduling", new_dag_home)
add_dag_to_db(example_dag_file.name, "test_dag_warnings", example_dag_home)
# Add DAG from new DAG home to the DB
other_dag = other_dagbag.get_dag("test_start_date_scheduling")
other_dag.fileloc = new_dag_file.name
other_dag.last_parsed_time = timezone.utcnow()
other_dag.sync_to_db(processor_subdir=new_dag_home)

manager.processor._file_paths = [new_dag_file, example_dag_file]
manager.processor._file_paths = [new_dag_file]

active_dag_count = (
session.query(func.count(DagModel.dag_id)).filter(DagModel.is_active).scalar()
)
assert active_dag_count == 3
active_dag_count = (
session.query(func.count(DagModel.dag_id)).filter(DagModel.is_active).scalar()
)
assert active_dag_count == 2

manager.processor._scan_stale_dags()
manager.processor._scan_stale_dags()

active_dag_count = (
session.query(func.count(DagModel.dag_id)).filter(DagModel.is_active).scalar()
)
assert active_dag_count == 2
active_dag_count = (
session.query(func.count(DagModel.dag_id)).filter(DagModel.is_active).scalar()
)
assert active_dag_count == 1

@mock.patch(
"airflow.dag_processing.processor.DagFileProcessorProcess.waitable_handle", new_callable=PropertyMock
Expand Down

0 comments on commit 103e65b

Please sign in to comment.