Skip to content

Commit

Permalink
Add basic support for git and local dag bundles (apache#42689)
Browse files Browse the repository at this point in the history
This is a first pass at git and local dag bundles, and the bundle
interface. These will certainly be expanded on (e.g. to support
non-public repos).
  • Loading branch information
jedcunningham authored Nov 8, 2024
1 parent 8906f12 commit a212bf8
Show file tree
Hide file tree
Showing 7 changed files with 447 additions and 0 deletions.
8 changes: 8 additions & 0 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@ core:
type: string
example: ~
default: "{AIRFLOW_HOME}/dags"
dag_bundle_storage_path:
description: |
The folder where Airflow bundles can store files locally (if required).
By default, this is ``tempfile.gettempdir()/airflow``. This path must be absolute.
version_added: 3.0.0
type: string
example: "`tempfile.gettempdir()/dag_bundles"
default: ~
hostname_callable:
description: |
Hostname by providing a path to a callable, which will resolve the hostname.
Expand Down
16 changes: 16 additions & 0 deletions airflow/dag_processing/bundles/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
82 changes: 82 additions & 0 deletions airflow/dag_processing/bundles/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

import tempfile
from abc import ABC, abstractmethod
from pathlib import Path

from airflow.configuration import conf


class BaseDagBundle(ABC):
"""
Base class for DAG bundles.
DAG bundles are used both by the DAG processor and by a worker when running a task. These usage
patterns are different, however.
When running a task, we know what version of the bundle we need (assuming the bundle supports versioning).
And we likely only need to keep this specific bundle version around for as long as we have tasks running using
that bundle version. This also means, that on a single worker, it's possible that multiple versions of the same
bundle are used at the same time.
In contrast, the DAG processor uses a bundle to keep the DAGs from that bundle up to date. There will not be
multiple versions of the same bundle in use at the same time. The DAG processor will always use the latest version.
:param name: String identifier for the DAG bundle
:param version: Version of the DAG bundle (Optional)
"""

supports_versioning: bool = False

def __init__(self, *, name: str, version: str | None = None) -> None:
self.name = name
self.version = version

@property
def _dag_bundle_root_storage_path(self) -> Path:
"""
Where bundles can store DAGs on disk (if local disk is required).
This is the root path, shared by various bundles. Each bundle should have its own subdirectory.
"""
if configured_location := conf.get("core", "dag_bundle_storage_path"):
return Path(configured_location)
return Path(tempfile.gettempdir(), "airflow", "dag_bundles")

@property
@abstractmethod
def path(self) -> Path:
"""
Path for this bundle.
Airflow will use this path to load/execute the DAGs from the bundle.
"""

@abstractmethod
def get_current_version(self) -> str:
"""
Retrieve a string that represents the version of the DAG bundle.
Airflow can use this value to retrieve this same bundle version later.
"""

@abstractmethod
def refresh(self) -> None:
"""Retrieve the latest version of the files in the bundle."""
115 changes: 115 additions & 0 deletions airflow/dag_processing/bundles/git.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

import os
from typing import TYPE_CHECKING

from git import Repo
from git.exc import BadName

from airflow.dag_processing.bundles.base import BaseDagBundle
from airflow.exceptions import AirflowException

if TYPE_CHECKING:
from pathlib import Path


class GitDagBundle(BaseDagBundle):
"""
git DAG bundle - exposes a git repository as a DAG bundle.
Instead of cloning the repository every time, we clone the repository once into a bare repo from the source
and then do a clone for each version from there.
:param repo_url: URL of the git repository
:param head: Branch or tag for this DAG bundle
"""

supports_versioning = True

def __init__(self, *, repo_url: str, head: str, **kwargs) -> None:
super().__init__(**kwargs)
self.repo_url = repo_url
self.head = head

self.bare_repo_path = self._dag_bundle_root_storage_path / "git" / self.name
self._clone_bare_repo_if_required()
self._ensure_version_in_bare_repo()
self._clone_repo_if_required()
self.repo.git.checkout(self.head)

if self.version:
if not self._has_version(self.repo, self.version):
self.repo.remotes.origin.fetch()

self.repo.head.set_reference(self.repo.commit(self.version))
self.repo.head.reset(index=True, working_tree=True)
else:
self.refresh()

def _clone_repo_if_required(self) -> None:
if not os.path.exists(self.path):
Repo.clone_from(
url=self.bare_repo_path,
to_path=self.path,
)
self.repo = Repo(self.path)

def _clone_bare_repo_if_required(self) -> None:
if not os.path.exists(self.bare_repo_path):
Repo.clone_from(
url=self.repo_url,
to_path=self.bare_repo_path,
bare=True,
)
self.bare_repo = Repo(self.bare_repo_path)

def _ensure_version_in_bare_repo(self) -> None:
if not self.version:
return
if not self._has_version(self.bare_repo, self.version):
self.bare_repo.remotes.origin.fetch("+refs/heads/*:refs/heads/*")
if not self._has_version(self.bare_repo, self.version):
raise AirflowException(f"Version {self.version} not found in the repository")

def __hash__(self) -> int:
return hash((self.name, self.get_current_version()))

def get_current_version(self) -> str:
return self.repo.head.commit.hexsha

@property
def path(self) -> Path:
location = self.version or self.head
return self._dag_bundle_root_storage_path / "git" / f"{self.name}+{location}"

@staticmethod
def _has_version(repo: Repo, version: str) -> bool:
try:
repo.commit(version)
return True
except BadName:
return False

def refresh(self) -> None:
if self.version:
raise AirflowException("Refreshing a specific version is not supported")

self.bare_repo.remotes.origin.fetch("+refs/heads/*:refs/heads/*")
self.repo.remotes.origin.pull()
47 changes: 47 additions & 0 deletions airflow/dag_processing/bundles/local.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

from pathlib import Path

from airflow.dag_processing.bundles.base import BaseDagBundle
from airflow.exceptions import AirflowException


class LocalDagBundle(BaseDagBundle):
"""
Local DAG bundle - exposes a local directory as a DAG bundle.
:param local_folder: Local folder where the DAGs are stored
"""

supports_versioning = False

def __init__(self, *, local_folder: str, **kwargs) -> None:
super().__init__(**kwargs)
self._path = Path(local_folder)

def get_current_version(self) -> str:
raise AirflowException("Not versioned!")

def refresh(self) -> None:
"""Nothing to refresh - it's just a local directory."""

@property
def path(self) -> Path:
return self._path
1 change: 1 addition & 0 deletions hatch_build.py
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,7 @@
# We should remove the limitation after 2.3 is released and our dependencies are updated to handle it
"flask>=2.2.1,<2.3",
"fsspec>=2023.10.0",
"gitpython>=3.1.40",
'google-re2>=1.0;python_version<"3.12"',
'google-re2>=1.1;python_version>="3.12"',
"gunicorn>=20.1.0",
Expand Down
Loading

0 comments on commit a212bf8

Please sign in to comment.