Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[async] Design ExecutionMode.AIRFLOW_ASYNC interface to incorporate additional database #1238

Open
pankajastro opened this issue Oct 3, 2024 · 0 comments
Labels
area:execution Related to the execution environment/mode, like Docker, Kubernetes, Local, VirtualEnv, etc execution:async Related to the Async execution mode profile:bigquery Related to BigQuery ProfileConfig
Milestone

Comments

@pankajastro
Copy link
Contributor

PR #1230 introduced an experimental version of asynchronous task execution, but it is tightly coupled with BigQuery. The goal of this issue is to refactor the code and design an interface that facilitates the easy addition of new asynchronous databases.

@dosubot dosubot bot added area:execution Related to the execution environment/mode, like Docker, Kubernetes, Local, VirtualEnv, etc profile:bigquery Related to BigQuery ProfileConfig labels Oct 3, 2024
tatiana added a commit that referenced this issue Oct 3, 2024
…_ASYNC` (#1230)

Enable BQ users to run dbt models (`full_refresh`) asynchronously. This
releases the Airflow worker node from waiting while the transformation
(I/O) happens in the dataware house, increasing the overall Airflow task
throughput (more information:
https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/deferring.html).
As part of this change, we introduce the capability of not using the dbt
command to run actual SQL transformations. This also avoids creating
subprocesses in the worker node (`ExecutionMode.LOCAL` with
`InvocationMode. SUBPROCESS` and `ExecutionMode.VIRTUALENV`) or the
overhead of creating a Kubernetes Pod to execute the actual dbt command
(`ExecutionMode.KUBERNETES`). This can avoid issues related to memory
and CPU usage.

This PR takes advantage of an already implemented async operator in the
Airflow repo by extending it in the Cosmos async operator. It also
utilizes the pre-compiled SQL generated as part of the PR
#1224. It downloads
the generated SQL from a remote location (S3/GCS), which allows us to
decouple from dbt during task execution.

## Details

- Expose `get_profile_type` on ProfileConfig: This aids in database
selection
- ~Add `async_op_args`: A high-level parameter to forward arguments to
the upstream operator (Airflow operator). (This may change in this PR
itself)~ The async operator params are process as kwargs in the
operator_args parameter
- Implement `DbtRunAirflowAsyncOperator`: This initializes the Airflow
Operator, retrieves the SQL query at task runtime from a remote
location, modifies the query as needed, and triggers the upstream
execute method.

## Limitations

- This feature only works when using Airflow 2.8 and above
- The async execution only works for BigQuery
- The async execution only supports running dbt models (other dbt
resources, such as seeds, sources, snapshots, tests, are run using the
`ExecutionMode.LOCAL`)
- This will work only if the user provides sets `full_refresh=True` in
`operator_args` (which means tables will be dropped before being
populated, as implemented in `dbt-core`)
- Users need to use `ProfileMapping` in `ProfileConfig`, since Cosmos
relies on having the connection (credentials) to be able to run the
transformation in BQ without `dbt-core`
- Users must provide the BQ `location` in `operator_args` (this is a
limitation from the `BigQueryInsertJobOperator` that is being used to
implement the native Airflow asynchronous support)

## Testing 

We have added a new dbt project to the repository to facilitate
asynchronous task execution. The goal is to accelerate development
without disrupting or requiring fixes for the existing tests. Also, we
have added DAG for end-to-end testing
https://github.com/astronomer/astronomer-cosmos/blob/bd6657a29b111510fc34b2baf0bcc0d65ec0e5b9/dev/dags/simple_dag_async.py

## Configuration

Users need to configure the below param to execute deferrable tasks in
the Cosmos

- [ExecutionMode:
AIRFLOW_ASYNC](https://astronomer.github.io/astronomer-cosmos/getting_started/execution-modes.html)
-
[remote_target_path](https://astronomer.github.io/astronomer-cosmos/configuration/cosmos-conf.html#remote-target-path)
-
[remote_target_path_conn_id](https://astronomer.github.io/astronomer-cosmos/configuration/cosmos-conf.html#remote-target-path-conn-id)

Example DAG:
https://github.com/astronomer/astronomer-cosmos/blob/bd6657a29b111510fc34b2baf0bcc0d65ec0e5b9/dev/dags/simple_dag_async.py

## Installation

You can leverage async operator support by installing an additional
dependency
```
astronomer-cosmos[dbt-bigquery, google]
```


## Documentation 

The PR also document the limitations and uses of Airflow async execution
in the Cosmos.

## Related Issue(s)

Related to: #1120
Closes: #1134

## Breaking Change?

No

## Notes

This is an experimental feature, and as such, it may undergo breaking
changes. We encourage users to share their experiences and feedback to
improve it further.

We'd love support and feedback so we can define the next steps.

## Checklist

- [x] I have made corresponding changes to the documentation (if
required)
- [x] I have added tests that prove my fix is effective or that my
feature works

## Credits

This was a result of teamwork and effort:

Co-authored-by: Pankaj Koti <[email protected]>
Co-authored-by: Tatiana Al-Chueyr <[email protected]>

## Future Work

- Design interface to facilitate the easy addition of new asynchronous
databases operators
#1238
- Improve the test coverage
#1239
- Address the limitations (we need to log these issues)

---------

Co-authored-by: Pankaj Koti <[email protected]>
Co-authored-by: Tatiana Al-Chueyr <[email protected]>
@tatiana tatiana added the execution:async Related to the Async execution mode label Oct 21, 2024
@tatiana tatiana changed the title Design ExecutionMode.AIRFLOW_ASYNC interface to incorporate additional database [async] Design ExecutionMode.AIRFLOW_ASYNC interface to incorporate additional database Oct 21, 2024
@tatiana tatiana changed the title [async] Design ExecutionMode.AIRFLOW_ASYNC interface to incorporate additional database [async] Design ExecutionMode.AIRFLOW_ASYNC interface to incorporate additional database Oct 21, 2024
@tatiana tatiana added this to the Cosmos 1.9.0 milestone Oct 30, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:execution Related to the execution environment/mode, like Docker, Kubernetes, Local, VirtualEnv, etc execution:async Related to the Async execution mode profile:bigquery Related to BigQuery ProfileConfig
Projects
None yet
Development

No branches or pull requests

2 participants