Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce ground work for ExecutionMode.AIRFLOW_ASYNC #1224

Merged
merged 11 commits into from
Sep 30, 2024

Conversation

pankajkoti
Copy link
Contributor

@pankajkoti pankajkoti commented Sep 25, 2024

This PR is the groundwork for the implementation of ExecutionMode.AIRFLOW_ASYNC (#1120), which - once all other epic tasks are completed - will enable asynchronous execution of dbt resources using Apache Airflow’s deferrable operators. As part of this work, this PR introduces a new option to the enum ExecutionMode : AIRFLOW_ASYNC. When this execution mode is used, Cosmos now creates a setup task that will pre-compile the dbt project SQL and make it available to the remaining dbt tasks. This PR, however, does not yet leverage Airflow's deferrable operators. If users use ExecutionMode.AIRFLOW_ASYNC they will actually be running ExecutionMode.LOCAL operators with this change. The PR (#1230) has a first experimental version of using deferrable operators for task execution.

Setup task as the ground work for a new Execution Mode: ExecutionMode.AIRFLOW_ASYNC:

  • Adds a new operator, DbtCompileAirflowAsyncOperator, as a root task(analogous to a setup task) in the DAG, running the dbt compile command and uploading the compiled SQL files to a remote storage location for subsequent tasks that fetch these compiled SQL files from the remote storage and run them asynchronously using Airflow's deferrable operators.

Airflow Configurations:

  • remote_target_path: Introduces a configurable path to store dbt-generated files remotely, supporting any storage scheme that works with Airflow’s Object Store (e.g., S3, GCS, Azure Blob).
  • remote_target_path_conn_id: Allows specifying a custom connection ID for the remote target path, defaulting to the scheme’s associated Airflow connection if not set.

Example DAG for CI Testing:

Introduces an example DAG (simple_dag_async.py) demonstrating how to use the new execution mode(The execution like mentioned earlier would still run like Execution Mode LOCAL operators at the moment with this PR alone)
This DAG is integrated into the CI pipeline to run integration tests and aims at verifying the functionality of the ExecutionMode.AIRFLOW_ASYNC as and when implementation gets added starting with the experimental implementation in #1230 .

Unit & Integration Tests:

  • Adds comprehensive unit and integration tests to ensure correct behavior.
  • Tests include validation for successful uploads, error handling for misconfigured remote paths, and scenarios where remote_target_path are not set.

Documentation:

  • Adds detailed documentation explaining how to configure and set the ExecutionMode.AIRFLOW_ASYNC.

Scope & Limitations of the feature being introduced:

  1. This feature is meant to be released as Experimental and is also marked so in the documentation.
  2. Currently, it has been scoped for only dbt models to be executed asynchronously (being worked upon in PR Experimental BQ support to run dbt models with ExecutionMode.AIRFLOW_ASYNC #1230), while other resource types would be run synchronously.
  3. BigQuery will be the only supported target database for this execution mode ((being worked upon in PR Experimental BQ support to run dbt models with ExecutionMode.AIRFLOW_ASYNC #1230).

Thus, this PR enhances Cosmos by providing the ground work for more efficient execution of long-running dbt resources

Additional Notes:

  • This feature is planned to be introduced in Cosmos v1.7.0.

related: #1134

Copy link

netlify bot commented Sep 25, 2024

Deploy Preview for sunny-pastelito-5ecb04 ready!

Name Link
🔨 Latest commit 481e149
🔍 Latest deploy log https://app.netlify.com/sites/sunny-pastelito-5ecb04/deploys/66fb149631707200089ba001
😎 Deploy Preview https://deploy-preview-1224--sunny-pastelito-5ecb04.netlify.app
📱 Preview on mobile
Toggle QR Code...

QR Code

Use your smartphone camera to open QR code link.

To edit notification comments on pull requests, go to your Netlify site configuration.

Copy link

codecov bot commented Sep 25, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 95.87%. Comparing base (e0a9fd3) to head (3565723).

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #1224      +/-   ##
==========================================
+ Coverage   95.78%   95.87%   +0.09%     
==========================================
  Files          65       66       +1     
  Lines        3745     3830      +85     
==========================================
+ Hits         3587     3672      +85     
  Misses        158      158              
Flag Coverage Δ
95.87% <100.00%> (+0.09%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

.github/workflows/test.yml Outdated Show resolved Hide resolved
.github/workflows/test.yml Show resolved Hide resolved
@pankajkoti pankajkoti changed the title Draft: dbt compile task Introduce implementation for ExecutionMode.AIRFLOW_ASYNC Sep 30, 2024
@pankajkoti pankajkoti marked this pull request as ready for review September 30, 2024 06:36
@dosubot dosubot bot added the size:XL This PR changes 500-999 lines, ignoring generated files. label Sep 30, 2024
@dosubot dosubot bot added area:config Related to configuration, like YAML files, environment variables, or executer configuration area:execution Related to the execution environment/mode, like Docker, Kubernetes, Local, VirtualEnv, etc dbt:compile Primarily related to dbt compile command or functionality labels Sep 30, 2024
@pankajkoti pankajkoti changed the title Introduce implementation for ExecutionMode.AIRFLOW_ASYNC Introduce ground work for ExecutionMode.AIRFLOW_ASYNC Sep 30, 2024
Copy link
Collaborator

@tatiana tatiana left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @pankajkoti , great work!

Some minor feedback inline.

Something that I was wondering: I know we are currently not uploading the manifest file and other things from target, but we may want to do this in the future. Please, could you confirm that within the folder representing the dbt project, we have a target folder, and that compiled is a folder within it? This way, let's say we want to upload the manifest.json, we'd still be able to do this.

cosmos/airflow/graph.py Outdated Show resolved Hide resolved
cosmos/operators/local.py Outdated Show resolved Hide resolved
cosmos/operators/local.py Show resolved Hide resolved
cosmos/operators/local.py Outdated Show resolved Hide resolved
cosmos/settings.py Outdated Show resolved Hide resolved
docs/getting_started/execution-modes.rst Outdated Show resolved Hide resolved
docs/getting_started/execution-modes.rst Outdated Show resolved Hide resolved
docs/getting_started/execution-modes.rst Outdated Show resolved Hide resolved
docs/getting_started/execution-modes.rst Outdated Show resolved Hide resolved
docs/getting_started/execution-modes.rst Show resolved Hide resolved
@pankajkoti
Copy link
Contributor Author

pankajkoti commented Sep 30, 2024

Something that I was wondering: I know we are currently not uploading the manifest file and other things from target, but we may want to do this in the future. Please, could you confirm that within the folder representing the dbt project, we have a target folder, and that compiled is a folder within it? This way, let's say we want to upload the manifest.json, we'd still be able to do this.

Yes, previously, I was not creating a compiled subdirectory, but now I have added so. Also, as discussed made necessary changes to include the task_group if present in the identifier alongwith dag_id in the remote compiled sql file path.

Copy link
Collaborator

@tatiana tatiana left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks great, @pankajkoti , thank you for addressing all the feedback. This is the first step of a very promising path, when Airflow worker nodes won't be blocked by running SQL transformations generated with dbt.

Something that we can look into as a follow-up is to introduce a teardown task to delete the dbt_compille artifacts. I logged a follow up task for us to look into this in the future: #1232 1232

@tatiana tatiana changed the title Introduce ground work for ExecutionMode.AIRFLOW_ASYNC Introduce ground work for ExecutionMode.AIRFLOW_ASYNC Sep 30, 2024
@tatiana tatiana merged commit 879b1a3 into main Sep 30, 2024
109 of 125 checks passed
@tatiana tatiana deleted the poc-dbt-compile-task branch September 30, 2024 21:57
pankajkoti added a commit that referenced this pull request Sep 30, 2024
We added the PR branch in the CI test.yml to test out a new example DAG by adding newer environment variables needed for the example DAG. I missed adding a review comment to remove the temporarily added PR branch. This PR removes that temporarily added branch
tatiana pushed a commit that referenced this pull request Sep 30, 2024
We temporarily added a PR (#1224) branch to the CI test.yml to test a
new example DAG, along with the necessary environment variables for the
DAG. However, I missed adding a review comment to remove the branch
after testing. This PR removes the temporarily added branch from the CI
configuration.
@tatiana tatiana mentioned this pull request Oct 2, 2024
tatiana added a commit that referenced this pull request Oct 3, 2024
…_ASYNC` (#1230)

Enable BQ users to run dbt models (`full_refresh`) asynchronously. This
releases the Airflow worker node from waiting while the transformation
(I/O) happens in the dataware house, increasing the overall Airflow task
throughput (more information:
https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/deferring.html).
As part of this change, we introduce the capability of not using the dbt
command to run actual SQL transformations. This also avoids creating
subprocesses in the worker node (`ExecutionMode.LOCAL` with
`InvocationMode. SUBPROCESS` and `ExecutionMode.VIRTUALENV`) or the
overhead of creating a Kubernetes Pod to execute the actual dbt command
(`ExecutionMode.KUBERNETES`). This can avoid issues related to memory
and CPU usage.

This PR takes advantage of an already implemented async operator in the
Airflow repo by extending it in the Cosmos async operator. It also
utilizes the pre-compiled SQL generated as part of the PR
#1224. It downloads
the generated SQL from a remote location (S3/GCS), which allows us to
decouple from dbt during task execution.

## Details

- Expose `get_profile_type` on ProfileConfig: This aids in database
selection
- ~Add `async_op_args`: A high-level parameter to forward arguments to
the upstream operator (Airflow operator). (This may change in this PR
itself)~ The async operator params are process as kwargs in the
operator_args parameter
- Implement `DbtRunAirflowAsyncOperator`: This initializes the Airflow
Operator, retrieves the SQL query at task runtime from a remote
location, modifies the query as needed, and triggers the upstream
execute method.

## Limitations

- This feature only works when using Airflow 2.8 and above
- The async execution only works for BigQuery
- The async execution only supports running dbt models (other dbt
resources, such as seeds, sources, snapshots, tests, are run using the
`ExecutionMode.LOCAL`)
- This will work only if the user provides sets `full_refresh=True` in
`operator_args` (which means tables will be dropped before being
populated, as implemented in `dbt-core`)
- Users need to use `ProfileMapping` in `ProfileConfig`, since Cosmos
relies on having the connection (credentials) to be able to run the
transformation in BQ without `dbt-core`
- Users must provide the BQ `location` in `operator_args` (this is a
limitation from the `BigQueryInsertJobOperator` that is being used to
implement the native Airflow asynchronous support)

## Testing 

We have added a new dbt project to the repository to facilitate
asynchronous task execution. The goal is to accelerate development
without disrupting or requiring fixes for the existing tests. Also, we
have added DAG for end-to-end testing
https://github.com/astronomer/astronomer-cosmos/blob/bd6657a29b111510fc34b2baf0bcc0d65ec0e5b9/dev/dags/simple_dag_async.py

## Configuration

Users need to configure the below param to execute deferrable tasks in
the Cosmos

- [ExecutionMode:
AIRFLOW_ASYNC](https://astronomer.github.io/astronomer-cosmos/getting_started/execution-modes.html)
-
[remote_target_path](https://astronomer.github.io/astronomer-cosmos/configuration/cosmos-conf.html#remote-target-path)
-
[remote_target_path_conn_id](https://astronomer.github.io/astronomer-cosmos/configuration/cosmos-conf.html#remote-target-path-conn-id)

Example DAG:
https://github.com/astronomer/astronomer-cosmos/blob/bd6657a29b111510fc34b2baf0bcc0d65ec0e5b9/dev/dags/simple_dag_async.py

## Installation

You can leverage async operator support by installing an additional
dependency
```
astronomer-cosmos[dbt-bigquery, google]
```


## Documentation 

The PR also document the limitations and uses of Airflow async execution
in the Cosmos.

## Related Issue(s)

Related to: #1120
Closes: #1134

## Breaking Change?

No

## Notes

This is an experimental feature, and as such, it may undergo breaking
changes. We encourage users to share their experiences and feedback to
improve it further.

We'd love support and feedback so we can define the next steps.

## Checklist

- [x] I have made corresponding changes to the documentation (if
required)
- [x] I have added tests that prove my fix is effective or that my
feature works

## Credits

This was a result of teamwork and effort:

Co-authored-by: Pankaj Koti <[email protected]>
Co-authored-by: Tatiana Al-Chueyr <[email protected]>

## Future Work

- Design interface to facilitate the easy addition of new asynchronous
databases operators
#1238
- Improve the test coverage
#1239
- Address the limitations (we need to log these issues)

---------

Co-authored-by: Pankaj Koti <[email protected]>
Co-authored-by: Tatiana Al-Chueyr <[email protected]>
tatiana pushed a commit that referenced this pull request Oct 3, 2024
Following up on the documentation added in PRs #1224 and #1230, this PR
refactors the documentation for Async Execution mode, particularly the
limitations section.

It also addresses a couple of un-rendered items in the scheduling.rst
file, caused by missing blank lines after the code-block directive.
tatiana added a commit that referenced this pull request Oct 4, 2024
New Features

* Introduction of experimental support to run dbt BQ models using Airflow deferrable operators by @pankajkoti @pankajastro @tatiana in #1224 #1230.
  This is a first step in this journey and we would really appreciate feedback from the community.

  For more information, check the documentation: https://astronomer.github.io/astronomer-cosmos/getting_started/execution-modes.html#airflow-async-experimental

  This work has been inspired by the talk "Airflow at Monzo: Evolving our data platform as the bank scales" by
  @jonathanrainer @ed-sparkes given at Airflow Summit 2023: https://airflowsummit.org/sessions/2023/airflow-at-monzo-evolving-our-data-platform-as-the-bank-scales/.

* Support using ``DatasetAlias`` and fix orphaning unreferenced dataset by @tatiana in #1217 #1240

  Documentation: https://astronomer.github.io/astronomer-cosmos/configuration/scheduling.html#data-aware-scheduling

* Add GCP_CLOUD_RUN_JOB execution mode by @ags-de #1153

  Learn more about it: https://astronomer.github.io/astronomer-cosmos/getting_started/gcp-cloud-run-job.html

Enhancements

* Create single virtualenv when ``DbtVirtualenvBaseOperator`` has ``virtualenv_dir=None`` and ``is_virtualenv_dir_temporary=True`` by @kesompochy in #1200
* Consistently handle build and imports in ``cosmos/__init__.py`` by @tatiana in #1215
* Add enum constants to init for direct import by @fabiomx in #1184

Bug fixes

* URL encode dataset names to support multibyte characters by @t0momi219 in #1198
* Fix invalid argument (``full_refresh``) passed to DbtTestAwsEksOperator (and others) by @johnhoran in #1175
* Fix ``printer_width`` arg type in ``DbtProfileConfigVars`` by @jessicaschueler in #1191
* Fix task owner fallback by @jmaicher in #1195

Docs

* Add scarf to readme and docs for website analytics by @cmarteepants in #1221
* Add ``virtualenv_dir`` param to ``ExecutionConfig`` docs by @pankajkoti in #1173
* Give credits to @LennartKloppenburg in CHANGELOG.rst by @tatiana #1174
* Refactor docs for async mode execution by @pankajkoti in #1241

Others

* Remove PR branch added for testing a change in CI in #1224 by @pankajkoti in #1233
* Fix CI wrt broken coverage upload artifact @pankajkoti in #1210
* Fix CI issues - Upgrade actions/upload-artifact & actions/download-artifact to v4 and set min version for packaging by @pankajkoti in #1208
* Resolve CI failures for Apache Airflow 2.7 jobs by @pankajkoti in #1182
* CI: Update GCP manifest file path based on new secret update by @pankajkoti in #1237
* Pre-commit hook updates in #1176 #1186, #1186, #1201, #1219, #1231
tatiana added a commit that referenced this pull request Oct 4, 2024
New Features

* Introduction of experimental support to run dbt BQ models using Airflow deferrable operators by @pankajkoti @pankajastro @tatiana in #1224 #1230.
  This is a first step in this journey and we would really appreciate feedback from the community.

  For more information, check the documentation: https://astronomer.github.io/astronomer-cosmos/getting_started/execution-modes.html#airflow-async-experimental

  This work has been inspired by the talk "Airflow at Monzo: Evolving our data platform as the bank scales" by
  @jonathanrainer @ed-sparkes given at Airflow Summit 2023: https://airflowsummit.org/sessions/2023/airflow-at-monzo-evolving-our-data-platform-as-the-bank-scales/.

* Support using ``DatasetAlias`` and fix orphaning unreferenced dataset by @tatiana in #1217 #1240

  Documentation: https://astronomer.github.io/astronomer-cosmos/configuration/scheduling.html#data-aware-scheduling

* Add GCP_CLOUD_RUN_JOB execution mode by @ags-de #1153

  Learn more about it: https://astronomer.github.io/astronomer-cosmos/getting_started/gcp-cloud-run-job.html

Enhancements

* Create single virtualenv when ``DbtVirtualenvBaseOperator`` has ``virtualenv_dir=None`` and ``is_virtualenv_dir_temporary=True`` by @kesompochy in #1200
* Consistently handle build and imports in ``cosmos/__init__.py`` by @tatiana in #1215
* Add enum constants to init for direct import by @fabiomx in #1184

Bug fixes

* URL encode dataset names to support multibyte characters by @t0momi219 in #1198
* Fix invalid argument (``full_refresh``) passed to DbtTestAwsEksOperator (and others) by @johnhoran in #1175
* Fix ``printer_width`` arg type in ``DbtProfileConfigVars`` by @jessicaschueler in #1191
* Fix task owner fallback by @jmaicher in #1195

Docs

* Add scarf to readme and docs for website analytics by @cmarteepants in #1221
* Add ``virtualenv_dir`` param to ``ExecutionConfig`` docs by @pankajkoti in #1173
* Give credits to @LennartKloppenburg in CHANGELOG.rst by @tatiana #1174
* Refactor docs for async mode execution by @pankajkoti in #1241

Others

* Remove PR branch added for testing a change in CI in #1224 by @pankajkoti in #1233
* Fix CI wrt broken coverage upload artifact @pankajkoti in #1210
* Fix CI issues - Upgrade actions/upload-artifact & actions/download-artifact to v4 and set min version for packaging by @pankajkoti in #1208
* Resolve CI failures for Apache Airflow 2.7 jobs by @pankajkoti in #1182
* CI: Update GCP manifest file path based on new secret update by @pankajkoti in #1237
* Pre-commit hook updates in #1176 #1186, #1186, #1201, #1219, #1231
tatiana added a commit that referenced this pull request Oct 4, 2024
**New Features**

* Support using ``DatasetAlias`` and fix orphaning unreferenced dataset
by @tatiana in #1217 #1240

Documentation:
https://astronomer.github.io/astronomer-cosmos/configuration/scheduling.html#data-aware-scheduling

* Add GCP_CLOUD_RUN_JOB execution mode by @ags-de #1153

Learn more about it:
https://astronomer.github.io/astronomer-cosmos/getting_started/gcp-cloud-run-job.html

* Introduction of experimental support to run dbt BQ models using
Airflow deferrable operators by @pankajkoti @pankajastro @tatiana in
#1224 #1230.

This is the first step in the journey of running dbt resources with
native Airflow, and we would appreciate feedback from the community.

For more information, check the documentation:
https://astronomer.github.io/astronomer-cosmos/getting_started/execution-modes.html#airflow-async-experimental

This work has been inspired by the talk "Airflow at Monzo: Evolving our
data platform as the bank scales" by
@jonathanrainer @ed-sparkes given at Airflow Summit 2023:
https://airflowsummit.org/sessions/2023/airflow-at-monzo-evolving-our-data-platform-as-the-bank-scales/.


**Enhancements**

* Create single virtualenv when ``DbtVirtualenvBaseOperator`` has
``virtualenv_dir=None`` and ``is_virtualenv_dir_temporary=True`` by
@kesompochy in #1200
* Consistently handle build and imports in ``cosmos/__init__.py`` by
@tatiana in #1215
* Add enum constants to init for direct import by @fabiomx in #1184

**Bug fixes**

* URL encode dataset names to support multibyte characters by @t0momi219
in #1198
* Fix invalid argument (``full_refresh``) passed to
DbtTestAwsEksOperator (and others) by @johnhoran in #1175
* Fix ``printer_width`` arg type in ``DbtProfileConfigVars`` by
@jessicaschueler in #1191
* Fix task owner fallback by @jmaicher in #1195

**Docs**

* Add scarf to readme and docs for website analytics by @cmarteepants in
#1221
* Add ``virtualenv_dir`` param to ``ExecutionConfig`` docs by
@pankajkoti in #1173
* Give credits to @LennartKloppenburg in CHANGELOG.rst by @tatiana #1174
* Refactor docs for async mode execution by @pankajkoti in #1241

Others

* Remove PR branch added for testing a change in CI in #1224 by
@pankajkoti in #1233
* Fix CI wrt broken coverage upload artifact @pankajkoti in #1210
* Fix CI issues - Upgrade actions/upload-artifact &
actions/download-artifact to v4 and set min version for packaging by
@pankajkoti in #1208
* Resolve CI failures for Apache Airflow 2.7 jobs by @pankajkoti in
#1182
* CI: Update GCP manifest file path based on new secret update by
@pankajkoti in #1237
* Pre-commit hook updates in #1176 #1186, #1186, #1201, #1219, #1231

---------

Co-authored-by: Pankaj Koti <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:config Related to configuration, like YAML files, environment variables, or executer configuration area:execution Related to the execution environment/mode, like Docker, Kubernetes, Local, VirtualEnv, etc dbt:compile Primarily related to dbt compile command or functionality lgtm This PR has been approved by a maintainer size:XL This PR changes 500-999 lines, ignoring generated files.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants