Skip to content

Commit

Permalink
partial kwargs deserialized MappedOperator set on unmapped Operator (a…
Browse files Browse the repository at this point in the history
…pache#42563)

Forwarding the partial kwargs to the underlying operator is done in
the standard (non-serialization) case and thus it should
be done here as well for things in the webserver that rely on these
fixed attributes.

An example is the `Triggered DAG` link for the `TriggerDagRunOperator`.
  • Loading branch information
fredthomsen authored Nov 18, 2024
1 parent 681651d commit 3f43bc4
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 1 deletion.
2 changes: 2 additions & 0 deletions airflow/models/mappedoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -821,6 +821,8 @@ def unmap(self, resolve: None | Mapping[str, Any] | tuple[Context, Session]) ->
from airflow.serialization.serialized_objects import SerializedBaseOperator

op = SerializedBaseOperator(task_id=self.task_id, params=self.params, _airflow_from_mapped=True)
for partial_attr, value in self.partial_kwargs.items():
setattr(op, partial_attr, value)
SerializedBaseOperator.populate_operator(op, self.operator_class)
if self.dag is not None: # For Mypy; we only serialize tasks in a DAG so the check always satisfies.
SerializedBaseOperator.set_task_dag_references(op, self.dag)
Expand Down
6 changes: 5 additions & 1 deletion tests/serialization/test_dag_serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -2529,7 +2529,11 @@ def test_operator_expand_deserialized_unmap():
ser_normal = BaseSerialization.serialize(normal)
deser_normal = BaseSerialization.deserialize(ser_normal)
deser_normal.dag = None
assert deser_mapped.unmap(None) == deser_normal
unmapped_deser_mapped = deser_mapped.unmap(None)

assert type(unmapped_deser_mapped) is type(deser_normal) is SerializedBaseOperator
assert unmapped_deser_mapped.task_id == deser_normal.task_id == "a"
assert unmapped_deser_mapped.executor_config == deser_normal.executor_config == {"a": "b"}


@pytest.mark.db_test
Expand Down

0 comments on commit 3f43bc4

Please sign in to comment.