Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

To support task display_name #1278

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
120 changes: 76 additions & 44 deletions cosmos/airflow/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,13 +128,34 @@ def create_test_task_metadata(
)


def _get_task_id_and_args(
node: DbtNode,
use_task_group: bool,
set_task_id_by_node: Callable[..., Any] | None,
resource_suffix: str,
) -> tuple[str, dict[str, Any]]:
"""
Generate task ID and update args with display name if needed.
"""
args_update = {}
if use_task_group:
task_id = resource_suffix
elif set_task_id_by_node:
task_id = set_task_id_by_node(node)
args_update["task_display_name"] = f"{node.name}_{resource_suffix}"
else:
task_id = f"{node.name}_{resource_suffix}"
return task_id, args_update


def create_task_metadata(
node: DbtNode,
execution_mode: ExecutionMode,
args: dict[str, Any],
dbt_dag_task_group_identifier: str,
use_task_group: bool = False,
source_rendering_behavior: SourceRenderingBehavior = SourceRenderingBehavior.NONE,
set_task_id_by_node: Callable[..., Any] | None = None,
) -> TaskMetadata | None:
"""
Create the metadata that will be used to instantiate the Airflow Task used to run the Dbt node.
Expand All @@ -155,57 +176,64 @@ def create_task_metadata(
DbtResourceType.TEST: "DbtTest",
DbtResourceType.SOURCE: "DbtSource",
}
args = {**args, **{"models": node.resource_name}}

if DbtResourceType(node.resource_type) in DEFAULT_DBT_RESOURCES and node.resource_type in dbt_resource_to_class:
extra_context = {
"dbt_node_config": node.context_dict,
"dbt_dag_task_group_identifier": dbt_dag_task_group_identifier,
}
if node.resource_type == DbtResourceType.MODEL:
task_id = f"{node.name}_run"
if use_task_group is True:
task_id = "run"
elif node.resource_type == DbtResourceType.SOURCE:
if (source_rendering_behavior == SourceRenderingBehavior.NONE) or (
source_rendering_behavior == SourceRenderingBehavior.WITH_TESTS_OR_FRESHNESS
and node.has_freshness is False
and node.has_test is False
):
return None
# TODO: https://github.com/astronomer/astronomer-cosmos
# pragma: no cover
task_id = f"{node.name}_source"
args["select"] = f"source:{node.resource_name}"
args.pop("models")
if use_task_group is True:
task_id = node.resource_type.value
if node.has_freshness is False and source_rendering_behavior == SourceRenderingBehavior.ALL:
# render sources without freshness as empty operators
return TaskMetadata(id=task_id, operator_class="airflow.operators.empty.EmptyOperator")
else:
task_id = f"{node.name}_{node.resource_type.value}"
if use_task_group is True:
task_id = node.resource_type.value

task_metadata = TaskMetadata(
id=task_id,
owner=node.owner,
operator_class=calculate_operator_class(
execution_mode=execution_mode, dbt_class=dbt_resource_to_class[node.resource_type]
),
arguments=args,
extra_context=extra_context,
)
return task_metadata
else:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a critical part of the code, and I feel that the bug fix does not justify changing all these lines.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for reviewing this over the weekend. I apologize for the lack of clarity in my pull request.

The purpose of these changes is as follows:

  1. Adding a parameter to enable the use of display_name
  2. Addressing Ruff errors that occurred as a result of point 1
  3. Updating the documentation (I also noticed that there was some missing information about "source_rendering_behavior" and have added that as well).

if (
DbtResourceType(node.resource_type) not in DEFAULT_DBT_RESOURCES
or node.resource_type not in dbt_resource_to_class
):
msg = (
f"Unavailable conversion function for <{node.resource_type}> (node <{node.unique_id}>). "
"Define a converter function using render_config.node_converters."
)
logger.warning(msg)
return None

args = {**args, **{"models": node.resource_name}}
extra_context = {
"dbt_node_config": node.context_dict,
"dbt_dag_task_group_identifier": dbt_dag_task_group_identifier,
}

if node.resource_type == DbtResourceType.MODEL:
task_id, args_update = _get_task_id_and_args(node, use_task_group, set_task_id_by_node, "run")
args.update(args_update)
elif node.resource_type == DbtResourceType.SOURCE:
if (source_rendering_behavior == SourceRenderingBehavior.NONE) or (
source_rendering_behavior == SourceRenderingBehavior.WITH_TESTS_OR_FRESHNESS
and node.has_freshness is False
and node.has_test is False
):
return None
# TODO: https://github.com/astronomer/astronomer-cosmos
# pragma: no cover
args["select"] = f"source:{node.resource_name}"
args.pop("models")
task_id, args_update = _get_task_id_and_args(node, use_task_group, set_task_id_by_node, "source")
args.update(args_update)
if node.has_freshness is False and source_rendering_behavior == SourceRenderingBehavior.ALL:
# render sources without freshness as empty operators
# empty operator does not accept custom parameters (e.g., profile_args). recreate the args.
if "task_display_name" in args:
args = {"task_display_name": args["task_display_name"]}
else:
args = {}
return TaskMetadata(id=task_id, operator_class="airflow.operators.empty.EmptyOperator", arguments=args)
else:
task_id, args_update = _get_task_id_and_args(
node, use_task_group, set_task_id_by_node, node.resource_type.value
)
args.update(args_update)

task_metadata = TaskMetadata(
id=task_id,
owner=node.owner,
operator_class=calculate_operator_class(
execution_mode=execution_mode, dbt_class=dbt_resource_to_class[node.resource_type]
),
arguments=args,
extra_context=extra_context,
)
return task_metadata


def generate_task_or_group(
dag: DAG,
Expand All @@ -217,6 +245,7 @@ def generate_task_or_group(
source_rendering_behavior: SourceRenderingBehavior,
test_indirect_selection: TestIndirectSelection,
on_warning_callback: Callable[..., Any] | None,
set_task_id_by_node: Callable[..., Any] | None = None,
**kwargs: Any,
) -> BaseOperator | TaskGroup | None:
task_or_group: BaseOperator | TaskGroup | None = None
Expand All @@ -234,6 +263,7 @@ def generate_task_or_group(
dbt_dag_task_group_identifier=_get_dbt_dag_task_group_identifier(dag, task_group),
use_task_group=use_task_group,
source_rendering_behavior=source_rendering_behavior,
set_task_id_by_node=set_task_id_by_node,
)

# In most cases, we'll map one DBT node to one Airflow task
Expand Down Expand Up @@ -335,6 +365,7 @@ def build_airflow_graph(
node_converters = render_config.node_converters or {}
test_behavior = render_config.test_behavior
source_rendering_behavior = render_config.source_rendering_behavior
set_task_id_by_node = render_config.set_task_id_by_node
tasks_map = {}
task_or_group: TaskGroup | BaseOperator

Expand All @@ -356,6 +387,7 @@ def build_airflow_graph(
source_rendering_behavior=source_rendering_behavior,
test_indirect_selection=test_indirect_selection,
on_warning_callback=on_warning_callback,
set_task_id_by_node=set_task_id_by_node,
node=node,
)
if task_or_group is not None:
Expand Down
3 changes: 3 additions & 0 deletions cosmos/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ class RenderConfig:
:param dbt_ls_path: Configures the location of an output of ``dbt ls``. Required when using ``load_method=LoadMode.DBT_LS_FILE``.
:param enable_mock_profile: Allows to enable/disable mocking profile. Enabled by default. Mock profiles are useful for parsing Cosmos DAGs in the CI, but should be disabled to benefit from partial parsing (since Cosmos 1.4).
:param source_rendering_behavior: Determines how source nodes are rendered when using cosmos default source node rendering (ALL, NONE, WITH_TESTS_OR_FRESHNESS). Defaults to "NONE" (since Cosmos 1.6).
:param airflow_vars_to_purge_dbt_ls_cache: Specify Airflow variables that will affect the LoadMode.DBT_LS cache.
:param set_task_id_by_node: A callable that takes a dbt node as input and returns the task ID. This allows users to assign a custom node ID separate from the display name.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We received feedback from end-users that Cosmos already has too many configurations. Would it be possible for us to handle non-ASCII following how other libraries handle this, without enforcing users to define a new configuration?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason for not providing users with an option to specify task_id themselves is that automatically generating task_id for names written in non-ASCII characters is a highly challenging task.

For example, while slugify (as mentioned in the documentation) can work in some cases, it’s not suitable for use in actual production code.

Examples:

  • slugify converts names based on pronunciation, which makes it difficult to keep Task IDs unique due to homophones. In Japanese, for instance, "accounting" and "finance" are represented by the same word.
  • Chinese and Japanese both use similar kanji characters, but these characters have different pronunciations in each language.

In other words, what’s needed is "translation" rather than a mechanical "conversion" like slugify for the sake of automation.

To ensure unique task IDs, some form of mapping would likely be necessary.
For example: { "顧客": "customers", "注文": "orders" }.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand the concern about having too many options for the end users. However, I believe it would be challenging to automatically generate task_id on behalf of the users.

May I please propose adding an option for this purpose? If that’s acceptable, I’d like to work on a revision that minimizes code changes.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @t0momi219, Thanks a lot for explaining these differences. I don't speak Chinese nor Japanese, so your explanation was extremely helpful. I now understand the need

Yes, please. It would be amazing if you could make changes to make this customizable without changing the codebase.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, given the explanation I'm inclined to provide & expose it as a configuration. Like that someone who really needs to use it can use.

Also +1 to minimising the code change, easier to review & address the scope of a particular issue. Thanks for working on this @t0momi219

"""

emit_datasets: bool = True
Expand All @@ -80,6 +82,7 @@ class RenderConfig:
enable_mock_profile: bool = True
source_rendering_behavior: SourceRenderingBehavior = SourceRenderingBehavior.NONE
airflow_vars_to_purge_dbt_ls_cache: list[str] = field(default_factory=list)
set_task_id_by_node: Callable[..., Any] | None = None

def __post_init__(self, dbt_project_path: str | Path | None) -> None:
if self.env_vars:
Expand Down
1 change: 1 addition & 0 deletions docs/configuration/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,4 @@ Cosmos offers a number of configuration options to customize its behavior. For m
Compiled SQL <compiled-sql>
Logging <logging>
Caching <caching>
Task display name <task-display-name>
2 changes: 2 additions & 0 deletions docs/configuration/render-config.rst
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ The ``RenderConfig`` class takes the following arguments:
- ``env_vars``: (available in v1.2.5, use``ProjectConfig.env_vars`` for v1.3.0 onwards) A dictionary of environment variables for rendering. Only supported when using ``load_method=LoadMode.DBT_LS``.
- ``dbt_project_path``: Configures the DBT project location accessible on their airflow controller for DAG rendering - Required when using ``load_method=LoadMode.DBT_LS`` or ``load_method=LoadMode.CUSTOM``
- ``airflow_vars_to_purge_cache``: (new in v1.5) Specify Airflow variables that will affect the ``LoadMode.DBT_LS`` cache. See `Caching <./caching.html>`_ for more information.
- ``source_rendering_behavior``: Determines how source nodes are rendered when using cosmos default source node rendering (ALL, NONE, WITH_TESTS_OR_FRESHNESS). Defaults to "NONE" (since Cosmos 1.6). See `Source Nodes Rendering <./source-nodes-rendering.html>`_ for more information.
- ``set_task_id_by_node``: A callable that takes a dbt node as input and returns the task ID. This function allows users to set a custom task_id independently of the model name, which can be specified as the task’s display_name. This way, task_id can be modified using a user-defined function, while the model name remains as the task’s display name. The display_name parameter is available in Airflow 2.9 and above. See `Task display name <./task-display-name.html>`_ for more information.

Customizing how nodes are rendered (experimental)
-------------------------------------------------
Expand Down
30 changes: 30 additions & 0 deletions docs/configuration/task-display-name.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
.. _task-display-name:

Task display name
================

.. note::
This feature is only available for Airflow >= 2.9.

In Airflow, `task_id` does not support non-ASCII characters. Therefore, if users wish to use non-ASCII characters (such as their native language) as display names while keeping `task_id` in ASCII, they can use the `display_name` parameter.

To work with projects that use non-ASCII characters in model names, the `set_task_id_by_node` field of `RenderConfig` can be utilized.

Example:

You can provide a function to convert the model name to an ASCII-compatible format. The function’s output is used as the TaskID, while the display name on Airflow remains as the original model name.

.. code-block:: python

from slugify import slugify


def set_task_id_by_node(node):
return slugify(node.name)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not to do this on behalf of the users?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I mentioned in my previous comment.



from cosmos import DbtTaskGroup, RenderConfig

jaffle_shop = DbtTaskGroup(
render_config=RenderConfig(set_task_id_by_node=set_task_id_by_node)
)
117 changes: 117 additions & 0 deletions tests/airflow/test_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,123 @@ def test_create_task_metadata_snapshot(caplog):
assert metadata.arguments == {"models": "my_snapshot"}


def _set_task_id_by_node(node: DbtNode) -> str:
"""for test_create_task_metadata_set_task_id_by_node"""
return f"new_task_id_{node.name}_{node.resource_type.value}"


@pytest.mark.skipif(
version.parse(airflow_version) < version.parse("2.9"),
reason="Airflow task did not have display_name until the 2.9 release",
)
@pytest.mark.parametrize(
"node_type,node_id,set_task_id_by_node,use_task_group,expected_node_id,expected_display_name",
[
# set_task_id_by_node is None (default)
(
DbtResourceType.MODEL,
f"{DbtResourceType.MODEL.value}.my_folder.test_node",
None,
False,
"test_node_run",
None,
),
(
DbtResourceType.SOURCE,
f"{DbtResourceType.SOURCE.value}.my_folder.test_node",
None,
False,
"test_node_source",
None,
),
(
DbtResourceType.SEED,
f"{DbtResourceType.SEED.value}.my_folder.test_node",
None,
False,
"test_node_seed",
None,
),
# set_task_id_by_node is passed and use_task_group is False
(
DbtResourceType.MODEL,
f"{DbtResourceType.MODEL.value}.my_folder.test_node",
_set_task_id_by_node,
False,
"new_task_id_test_node_model",
"test_node_run",
),
(
DbtResourceType.SOURCE,
f"{DbtResourceType.MODEL.value}.my_folder.test_node",
_set_task_id_by_node,
False,
"new_task_id_test_node_source",
"test_node_source",
),
(
DbtResourceType.SEED,
f"{DbtResourceType.MODEL.value}.my_folder.test_node",
_set_task_id_by_node,
False,
"new_task_id_test_node_seed",
"test_node_seed",
),
# set_task_id_by_node is passed and use_task_group is True
(
DbtResourceType.MODEL,
f"{DbtResourceType.MODEL.value}.my_folder.test_node",
_set_task_id_by_node,
True,
"run",
None,
),
(
DbtResourceType.SOURCE,
f"{DbtResourceType.MODEL.value}.my_folder.test_node",
_set_task_id_by_node,
True,
"source",
None,
),
(
DbtResourceType.SEED,
f"{DbtResourceType.MODEL.value}.my_folder.test_node",
_set_task_id_by_node,
True,
"seed",
None,
),
],
)
def test_create_task_metadata_set_task_id_by_node(
node_type, node_id, set_task_id_by_node, use_task_group, expected_node_id, expected_display_name
):
node = DbtNode(
unique_id=node_id,
resource_type=node_type,
depends_on=[],
file_path="",
tags=[],
config={},
)
args = {}
metadata = create_task_metadata(
node,
execution_mode=ExecutionMode.LOCAL,
args=args,
dbt_dag_task_group_identifier="",
use_task_group=use_task_group,
set_task_id_by_node=set_task_id_by_node,
source_rendering_behavior=SourceRenderingBehavior.ALL,
)
assert metadata.id == expected_node_id
if expected_display_name:
assert metadata.arguments["task_display_name"] == expected_display_name
else:
assert "task_display_name" not in metadata.arguments


@pytest.mark.parametrize(
"node_type,node_unique_id,test_indirect_selection,additional_arguments",
[
Expand Down
Loading