Skip to content

Commit

Permalink
AIP-62: Add HookLineageCollector (apache#40335)
Browse files Browse the repository at this point in the history
* Add HookLineageCollector that during task execution
should register and hold lineage sent from hooks.

Add HookLineageReader that defines whether HookLineageCollector
should be enabled to process lineage sent from hooks.

Add Dataset factories to make sure Datasets registered with
HookLineageCollector is AIP-60 compliant.

Signed-off-by: Jakub Dardzinski <[email protected]>

* Remove default `create_dataset` method.

Add section in experimental lineage docs.

Signed-off-by: Jakub Dardzinski <[email protected]>

---------

Signed-off-by: Jakub Dardzinski <[email protected]>
  • Loading branch information
JDarDagran authored Jul 15, 2024
1 parent 3448bdd commit cd68840
Show file tree
Hide file tree
Showing 7 changed files with 395 additions and 4 deletions.
181 changes: 181 additions & 0 deletions airflow/lineage/hook.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from typing import Union

import attr

from airflow.datasets import Dataset
from airflow.hooks.base import BaseHook
from airflow.io.store import ObjectStore
from airflow.providers_manager import ProvidersManager
from airflow.utils.log.logging_mixin import LoggingMixin

# Store context what sent lineage.
LineageContext = Union[BaseHook, ObjectStore]

_hook_lineage_collector: HookLineageCollector | None = None


@attr.define
class HookLineage:
"""Holds lineage collected by HookLineageCollector."""

inputs: list[tuple[Dataset, LineageContext]] = attr.ib(factory=list)
outputs: list[tuple[Dataset, LineageContext]] = attr.ib(factory=list)


class HookLineageCollector(LoggingMixin):
"""
HookLineageCollector is a base class for collecting hook lineage information.
It is used to collect the input and output datasets of a hook execution.
"""

def __init__(self, **kwargs):
super().__init__(**kwargs)
self.inputs: list[tuple[Dataset, LineageContext]] = []
self.outputs: list[tuple[Dataset, LineageContext]] = []

def create_dataset(
self, scheme: str | None, uri: str | None, dataset_kwargs: dict | None, dataset_extra: dict | None
) -> Dataset | None:
"""
Create a Dataset instance using the provided parameters.
This method attempts to create a Dataset instance using the given parameters.
It first checks if a URI is provided and falls back to using the default dataset factory
with the given URI if no other information is available.
If a scheme is provided but no URI, it attempts to find a dataset factory that matches
the given scheme. If no such factory is found, it logs an error message and returns None.
If dataset_kwargs is provided, it is used to pass additional parameters to the Dataset
factory. The dataset_extra parameter is also passed to the factory as an ``extra`` parameter.
"""
if uri:
# Fallback to default factory using the provided URI
return Dataset(uri=uri, extra=dataset_extra)

if not scheme:
self.log.debug(
"Missing required parameter: either 'uri' or 'scheme' must be provided to create a Dataset."
)
return None

dataset_factory = ProvidersManager().dataset_factories.get(scheme)
if not dataset_factory:
self.log.debug("Unsupported scheme: %s. Please provide a valid URI to create a Dataset.", scheme)
return None

dataset_kwargs = dataset_kwargs or {}
try:
return dataset_factory(**dataset_kwargs, extra=dataset_extra)
except Exception as e:
self.log.debug("Failed to create dataset. Skipping. Error: %s", e)
return None

def add_input_dataset(
self,
context: LineageContext,
scheme: str | None = None,
uri: str | None = None,
dataset_kwargs: dict | None = None,
dataset_extra: dict | None = None,
):
"""Add the input dataset and its corresponding hook execution context to the collector."""
dataset = self.create_dataset(
scheme=scheme, uri=uri, dataset_kwargs=dataset_kwargs, dataset_extra=dataset_extra
)
if dataset:
self.inputs.append((dataset, context))

def add_output_dataset(
self,
context: LineageContext,
scheme: str | None = None,
uri: str | None = None,
dataset_kwargs: dict | None = None,
dataset_extra: dict | None = None,
):
"""Add the output dataset and its corresponding hook execution context to the collector."""
dataset = self.create_dataset(
scheme=scheme, uri=uri, dataset_kwargs=dataset_kwargs, dataset_extra=dataset_extra
)
if dataset:
self.outputs.append((dataset, context))

@property
def collected_datasets(self) -> HookLineage:
"""Get the collected hook lineage information."""
return HookLineage(self.inputs, self.outputs)

@property
def has_collected(self) -> bool:
"""Check if any datasets have been collected."""
return len(self.inputs) != 0 or len(self.outputs) != 0


class NoOpCollector(HookLineageCollector):
"""
NoOpCollector is a hook lineage collector that does nothing.
It is used when you want to disable lineage collection.
"""

def add_input_dataset(self, *_):
pass

def add_output_dataset(self, *_):
pass

@property
def collected_datasets(
self,
) -> HookLineage:
self.log.warning(
"Data lineage tracking is disabled. Register a hook lineage reader to start tracking hook lineage."
)
return HookLineage([], [])


class HookLineageReader(LoggingMixin):
"""Class used to retrieve the hook lineage information collected by HookLineageCollector."""

def __init__(self, **kwargs):
self.lineage_collector = get_hook_lineage_collector()

def retrieve_hook_lineage(self) -> HookLineage:
"""Retrieve hook lineage from HookLineageCollector."""
hook_lineage = self.lineage_collector.collected_datasets
return hook_lineage


def get_hook_lineage_collector() -> HookLineageCollector:
"""Get singleton lineage collector."""
global _hook_lineage_collector
if not _hook_lineage_collector:
from airflow import plugins_manager

plugins_manager.initialize_hook_lineage_readers_plugins()
if plugins_manager.hook_lineage_reader_classes:
_hook_lineage_collector = HookLineageCollector()
else:
_hook_lineage_collector = NoOpCollector()
return _hook_lineage_collector
27 changes: 27 additions & 0 deletions airflow/plugins_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import os
import sys
import types
from cgitb import Hook
from pathlib import Path
from typing import TYPE_CHECKING, Any, Iterable

Expand All @@ -41,6 +42,8 @@
from airflow.utils.module_loading import import_string, qualname

if TYPE_CHECKING:
from airflow.lineage.hook import HookLineageReader

try:
import importlib_metadata as metadata
except ImportError:
Expand Down Expand Up @@ -75,6 +78,7 @@
registered_operator_link_classes: dict[str, type] | None = None
registered_ti_dep_classes: dict[str, type] | None = None
timetable_classes: dict[str, type[Timetable]] | None = None
hook_lineage_reader_classes: list[type[Hook]] | None = None
priority_weight_strategy_classes: dict[str, type[PriorityWeightStrategy]] | None = None
"""
Mapping of class names to class of OperatorLinks registered by plugins.
Expand Down Expand Up @@ -176,8 +180,12 @@ class AirflowPlugin:
# A list of timetable classes that can be used for DAG scheduling.
timetables: list[type[Timetable]] = []

# A list of listeners that can be used for tracking task and DAG states.
listeners: list[ModuleType | object] = []

# A list of hook lineage reader classes that can be used for reading lineage information from a hook.
hook_lineage_readers: list[type[HookLineageReader]] = []

# A list of priority weight strategy classes that can be used for calculating tasks weight priority.
priority_weight_strategies: list[type[PriorityWeightStrategy]] = []

Expand Down Expand Up @@ -483,6 +491,25 @@ def initialize_timetables_plugins():
}


def initialize_hook_lineage_readers_plugins():
"""Collect hook lineage reader classes registered by plugins."""
global hook_lineage_reader_classes

if hook_lineage_reader_classes is not None:
return

ensure_plugins_loaded()

if plugins is None:
raise AirflowPluginException("Can't load plugins.")

log.debug("Initialize hook lineage readers plugins")

hook_lineage_reader_classes = []
for plugin in plugins:
hook_lineage_reader_classes.extend(plugin.hook_lineage_readers)


def integrate_executor_plugins() -> None:
"""Integrate executor plugins to the context."""
global plugins
Expand Down
4 changes: 4 additions & 0 deletions airflow/provider.yaml.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,10 @@
"handler": {
"type": ["string", "null"],
"description": "Normalization function for specified URI schemes. Import path to a callable taking and returning a SplitResult. 'null' specifies a no-op."
},
"factory": {
"type": ["string", "null"],
"description": "Dataset factory for specified URI. Creates AIP-60 compliant Dataset."
}
}
}
Expand Down
23 changes: 19 additions & 4 deletions airflow/providers_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ def ensure_prefix(field):
if TYPE_CHECKING:
from urllib.parse import SplitResult

from airflow.datasets import Dataset
from airflow.decorators.base import TaskDecorator
from airflow.hooks.base import BaseHook
from airflow.typing_compat import Literal
Expand Down Expand Up @@ -426,6 +427,7 @@ def __init__(self):
self._hooks_dict: dict[str, HookInfo] = {}
self._fs_set: set[str] = set()
self._dataset_uri_handlers: dict[str, Callable[[SplitResult], SplitResult]] = {}
self._dataset_factories: dict[str, Callable[..., Dataset]] = {}
self._taskflow_decorators: dict[str, Callable] = LazyDictWithCache() # type: ignore[assignment]
# keeps mapping between connection_types and hook class, package they come from
self._hook_provider_dict: dict[str, HookClassProvider] = {}
Expand Down Expand Up @@ -523,11 +525,12 @@ def initialize_providers_filesystems(self):
self._discover_filesystems()

@provider_info_cache("dataset_uris")
def initialize_providers_dataset_uri_handlers(self):
def initialize_providers_dataset_uri_handlers_and_factories(self):
"""Lazy initialization of provider dataset URI handlers."""
self.initialize_providers_list()
self._discover_dataset_uri_handlers()
self._discover_dataset_uri_handlers_and_factories()

@provider_info_cache("hook_lineage_writers")
@provider_info_cache("taskflow_decorators")
def initialize_providers_taskflow_decorator(self):
"""Lazy initialization of providers hooks."""
Expand Down Expand Up @@ -878,7 +881,7 @@ def _discover_filesystems(self) -> None:
self._fs_set.add(fs_module_name)
self._fs_set = set(sorted(self._fs_set))

def _discover_dataset_uri_handlers(self) -> None:
def _discover_dataset_uri_handlers_and_factories(self) -> None:
from airflow.datasets import normalize_noop

for provider_package, provider in self._provider_dict.items():
Expand All @@ -893,6 +896,13 @@ def _discover_dataset_uri_handlers(self) -> None:
elif not (handler := _correctness_check(provider_package, handler_path, provider)):
continue
self._dataset_uri_handlers.update((scheme, handler) for scheme in schemes)
factory_path = handler_info.get("factory")
if not (
factory_path is not None
and (factory := _correctness_check(provider_package, factory_path, provider))
):
continue
self._dataset_factories.update((scheme, factory) for scheme in schemes)

def _discover_taskflow_decorators(self) -> None:
for name, info in self._provider_dict.items():
Expand Down Expand Up @@ -1289,9 +1299,14 @@ def filesystem_module_names(self) -> list[str]:
self.initialize_providers_filesystems()
return sorted(self._fs_set)

@property
def dataset_factories(self) -> dict[str, Callable[..., Dataset]]:
self.initialize_providers_dataset_uri_handlers_and_factories()
return self._dataset_factories

@property
def dataset_uri_handlers(self) -> dict[str, Callable[[SplitResult], SplitResult]]:
self.initialize_providers_dataset_uri_handlers()
self.initialize_providers_dataset_uri_handlers_and_factories()
return self._dataset_uri_handlers

@property
Expand Down
43 changes: 43 additions & 0 deletions docs/apache-airflow/administration-and-deployment/lineage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,49 @@ has outlets defined (e.g. by using ``add_outlets(..)`` or has out of the box sup
.. _precedence: https://docs.python.org/3/reference/expressions.html

Hook Lineage
------------

Airflow provides a powerful feature for tracking data lineage not only between tasks but also from hooks used within those tasks.
This functionality helps you understand how data flows throughout your Airflow pipelines.

A global instance of ``HookLineageCollector`` serves as the central hub for collecting lineage information.
Hooks can send details about datasets they interact with to this collector.
The collector then uses this data to construct AIP-60 compliant Datasets, a standard format for describing datasets.

.. code-block:: python
from airflow.lineage.hook_lineage import get_hook_lineage_collector
class CustomHook(BaseHook):
def run(self):
# run actual code
collector = get_hook_lineage_collector()
collector.add_input_dataset(self, dataset_kwargs={"scheme": "file", "path": "/tmp/in"})
collector.add_output_dataset(self, dataset_kwargs={"scheme": "file", "path": "/tmp/out"})
Lineage data collected by the ``HookLineageCollector`` can be accessed using an instance of ``HookLineageReader``,
which is registered in an Airflow plugin.

.. code-block:: python
from airflow.lineage.hook_lineage import HookLineageReader
from airflow.plugins_manager import AirflowPlugin
class CustomHookLineageReader(HookLineageReader):
def get_inputs(self):
return self.lineage_collector.collected_datasets.inputs
class HookLineageCollectionPlugin(AirflowPlugin):
name = "HookLineageCollectionPlugin"
hook_lineage_readers = [CustomHookLineageReader]
If no ``HookLineageReader`` is registered within Airflow, a default ``NoOpCollector`` is used instead.
This collector does not create AIP-60 compliant datasets or collect lineage information.


Lineage Backend
---------------
Expand Down
Loading

0 comments on commit cd68840

Please sign in to comment.