Hi there! We're thrilled that you'd like to contribute to this project. Your help is essential for keeping it great.
Please note that this project is released with a Contributor Code of Conduct. By participating in this project you agree to abide by its terms.
We will only create Async operators for the "sync-version" of operators that do some level of polling (take more than a few seconds to complete).
For example, we won’t create an async Operator for a BigQueryCreateEmptyTableOperator
but will create one
for BigQueryInsertJobOperator
that actually runs queries and can take hours in the worst case for task completion.
If you have suggestions for how this project could be improved, or want to report a bug, open an issue! We'd love all and any contributions. If you have questions, too, we'd love to hear them.
We'd also love PRs. If you're thinking of a large PR, we advise opening up an issue first to talk about it, though! Look at the links below if you're not sure how to open a PR.
If you have other questions, use Github Discussions
You can configure the Docker-based development environment as follows:
- Install the latest versions of the Docker Community Edition and Docker Compose and add them to the
PATH
. - Create a local virtualenv, for example:
python3 -m venv myenv
- Initialize and activate the created environment:
source myenv/bin/activate
Open your IDE (for example, PyCharm) and select the virtualenv you created as the project's default virtualenv in your IDE.
Install pre-commit framework with
pip install pre-commit
.Run
pre-commit install
to ensure that pre-commit hooks are executed on every commitPut the DAGs you want to run in the
dev/dags
directory:If you want to add Connections, create a
connections.yaml
file in thedev
directory.See the Connections Guide for more information.
Example:
druid_broker_default: conn_type: druid extra: '{"endpoint": "druid/v2/sql"}' host: druid-broker login: null password: null port: 8082 schema: null airflow_db: conn_type: mysql extra: null host: mysql login: root password: plainpassword port: null schema: airflow
The following commands are available to run from the root of the repository.
make dev
- To create a development Environment usingdocker-compose
file.make logs
- To view the logs of the all the containersmake stop
- To stop all the containersmake clean
- To remove all the containers along with volumesmake docs
- To build the docs using Sphinxmake help
- To view the available commandsmake build
- To re-build the docker imagemake build-run
- To build the docker image and then run containersmake restart
- To restart Scheduler & Triggerer containersmake restart-all
- To restart all the containersmake run-mypy
- Run MyPy checksmake run-tests
- Run CI testsmake run-static-checks
- Run CI static code checksmake shell
- To run bash/shell within a container (Allows interactive session)
Following ports are accessible from the host machine:
8080
- Webserver5555
- Flower5432
- Postgres
Dev Directories:
dev/dags/
- DAG Filesdev/logs/
- Logs files of the Airflow containers
- Update the local sources to address the issue you are working on.
- Make sure your fork's main is synced with Astronomer Provider's main before you create a branch. See How to sync your fork for details.
- Create a local branch for your development. Make sure to use latest
astronomer-providers/main
as base for the branch. This allows you to easily compare changes, have several changes that you work on at the same time and many more. - Add necessary code and unit tests.
- Run the unit tests from the IDE or local virtualenv as you see fit.
- Ensure test coverage is above 90% for each of the files that you are changing.
- Run and fix all the static checks. If you have
pre-commits installed, this step is automatically run while you are committing your code.
If not, you can do it manually via
git add
and thenpre-commit run
.
- Remember to keep your branches up to date with the
main
branch, squash commits, and resolve all conflicts. - Re-run static code checks again.
- Make sure your commit has a good title and description of the context of your change, enough for the committer reviewing it to understand why you are proposing a change. Make sure to follow other PR guidelines described in pull request guidelines. Create Pull Request!
Before you submit a pull request (PR), check that it meets these guidelines:
- Include tests unit tests and example DAGs (wherever applicable) to your pull request. It will help you make sure you do not break the build with your PR and that you help increase coverage.
- Rebase your fork, and resolve all conflicts.
- When merging PRs, Committer will use Squash and Merge which means then your PR will be merged as one commit, regardless of the number of commits in your PR. During the review cycle, you can keep a commit history for easier review, but if you need to, you can also squash all commits to reduce the maintenance burden during rebase.
- If your pull request adds functionality, make sure to update the docs as part of the same PR. Doc string is often sufficient. Make sure to follow the Sphinx compatible standards.
- Run tests locally before opening PR.
- Adhere to guidelines for commit messages described in this article. This makes the lives of those who come after you a lot easier.
We check our code quality via static code checks. The static code checks in astronomer-providers are used to verify that the code meets certain quality standards. All the static code checks can be run through pre-commit hooks.
Your code must pass all the static code checks in the CI in order to be eligible for Code Review. The easiest way to make sure your code is good before pushing is to use pre-commit checks locally as described in the static code checks documentation.
You can also run some static code checks via make command using available bash scripts.
make run-static-checks
Pre-commit hooks help speed up your local development cycle and place less burden on the CI infrastructure. Consider installing the pre-commit hooks as a necessary prerequisite.
The pre-commit hooks by default only check the files you are currently working on and make them fast. Yet, these checks use exactly the same environment as the CI tests use. So, you can be sure your modifications will also work for CI if they pass pre-commit hooks.
We have integrated the fantastic pre-commit framework in our development workflow. To install and use it, you need at least Python 3.7 locally.
It is the best to use pre-commit hooks when you have your local virtualenv or conda environment
for astronomer-providers activated since then pre-commit hooks and other dependencies are
automatically installed. You can also install the pre-commit hooks manually
using pip install
.
pip install pre-commit
After installation, pre-commit hooks are run automatically when you commit the code and they will
only run on the files that you change during your commit, so they are usually pretty fast and do
not slow down your iteration speed on your changes. There are also ways to disable the pre-commits
temporarily when you commit your code with --no-verify
switch or skip certain checks that you find
to much disturbing your local workflow.
To turn on pre-commit checks for commit
operations in git, enter:
pre-commit install
To install the checks also for pre-push
operations, enter:
pre-commit install -t pre-push
For details on advanced usage of the install method, use:
pre-commit install --help
Most of our coding style rules are enforced programmatically by flake8 and mypy (which are run automatically on every pull request), but there are some rules that are not yet automated and are more Airflow specific or semantic than style.
- Class names contain 'Operator', 'Hook', 'Sensor', 'Trigger' - for example
BigQueryInsertJobOperatorAsync
,BigQueryHookAsync
- Operator name usually follows the convention:
<Subject><Action><Entity>OperatorAsync
(BigQueryInsertJobOperatorAsync
) is a good example - Tests are grouped in parallel packages under "tests" top level package. Module name is usually
test_<object_to_test>.py
- System/Example test DAGs are placed under
example_dags
folder within respective folders.
- The example DAG should be self-sufficient as it is tested as part of the CI. For example, while implementing example DAG for
S3KeySensorAsync
, the DAG should first create bucket, then upload s3 key, the check for key usingS3KeySensorAsync
and then finally delete the bucket once sensor found the key. - Read resource names for clusters, databases, files, etc. from environment variables and avoid hard-coding it.
- Read Airflow connection-id from environment variable and pass the value in respective task. We recommend to not depend on default connection-id.
- Since we trigger the DAGs externally, set the DAG
schedule
parameter toNone
to avoid unnecessary runs. - Set the DAG
catchup
parameter toFalse
to avoid any backfill. - Add appropriate tags for the DAG. This will help to group and search DAGs effectively.
- Do not mention AWS keys or credentials in the example DAG.
- Add proper doc-strings as part of example DAG.
- Include a long running query always in the example DAG.
- Include a clean up step at the start of the example DAG so that there won't be failures if the resources are already present.
- Set
trigger_rule
parameter toall_done
for the cleanup task so that clean up happens irrespective of upstream task failures. - Run all the steps in example DAG even if a particular task fails.
- Make sure to register the DAG in master dag so that it won't be skipped in CI.
- Writing a deferrable or async operator takes a bit more work. There are some main points to consider:
- Deferrable Operators & Triggers rely on more recent asyncio features, and as a result only work on Python 3.7 or higher.
- Any Deferrable Operator implementation needs the API used to give you a unique identifier in order to poll for the status in the Trigger. This does not affect creating an async Sensor as "sensors" are just poll-based whereas "Operators" are "Submit + Poll" operation. For example in the below code snippet, the Google BigQuery API returns a job_id using which we can track the status of the job execution from the Trigger.
job = self._submit_job(hook, configuration=configuration) self.job_id = job.job_id self.defer( timeout=self.execution_timeout, trigger=BigQueryGetDataTrigger( conn_id=self.gcp_conn_id, job_id=self.job_id, dataset_id=self.dataset_id, table_id=self.table_id, project_id=hook.project_id, ), method_name="execute_complete", )
- Your Operator must defer itself with a Trigger. If there is a Trigger in core Airflow you can use, great; otherwise, you will have to write one.
- Your Operator will be stopped and removed from its worker while deferred, and no state will persist automatically. You can persist state by asking Airflow to resume you at a certain method or pass certain kwargs, but that’s it.
- You can defer multiple times, and you can defer before/after your Operator does significant work, or only defer if certain conditions are met (e.g. a system does not have an immediate answer). Deferral is entirely under your control.
- Any Operator can defer; no special marking on its class is needed, and it’s not limited to Sensors.
- If you want to trigger deferral, at any place in your Operator you can call
self.defer(trigger, method_name, kwargs, timeout)
, which will raise a special exception that Airflow will catch. The arguments are: trigger
: An instance of a Trigger that you wish to defer on. It will be serialized into the database.method_name
: The method name on your Operator you want Airflow to call when it resumes.kwargs
: Additional keyword arguments to pass to the method when it is called. Optional, defaults to {}.timeout
: A timedelta that specifies a timeout after which this deferral will fail, and fail the task instance. Optional, defaults to None, meaning no timeout.
- If you want to trigger deferral, at any place in your Operator you can call
- A Trigger is written as a class that inherits from
BaseTrigger
, and implements three methods: __init__
, to receive arguments from Operators instantiating itrun
, an asynchronous method that runs its logic and yields one or more TriggerEvent instances as an asynchronous generatorserialize
, which returns the information needed to re-construct this trigger, as a tuple of the classpath, and keyword arguments to pass to__init__
- A Trigger is written as a class that inherits from
- There’s also some design constraints in the Trigger to be aware of:
- From Operator we cannot pass a class object to Trigger because
serialize
method will only support JSON-serializable values. - The
run
method must be asynchronous (using Python’s asyncio), and correctlyawait
whenever it does a blocking operation. run
mustyield
its TriggerEvents, not return them. If it returns before yielding at least one event, Airflow will consider this an error and fail any Task Instances waiting on it. If it throws an exception, Airflow will also fail any dependent task instances.- You should assume that a trigger instance may run more than once (this can happen if a network partition occurs and Airflow re-launches a trigger on a separated machine). So you must be mindful about side effects. For example you might not want to use a trigger to insert database rows.
- If your trigger is designed to emit more than one event (not currently supported), then each emitted event must contain a payload that can be used to deduplicate events if the trigger is being run in multiple places. If you only fire one event and don’t need to pass information back to the Operator, you can just set the payload to
None
. - A trigger may be suddenly removed from one triggerer service and started on a new one, for example if subnets are changed and a network partition results, or if there is a deployment. If desired you may implement the
cleanup
method, which is always called afterrun
whether the trigger exits cleanly or otherwise.
- From Operator we cannot pass a class object to Trigger because
The Async version of the operator should ideally be easily swappable and no DAG-facing changes should be required apart from changing Import Paths.
See if the official library supports async, if not find a third-party library that supports async calls. For example,
pip install apache-airflow-providers-snowflake
also installssnowflake-connector-python
which officially support async calls to execute the queries. So it is used directly to implement deferrable operators for Snowflake. But many providers don't come with official support for async like Amazon. If not some research to find the right third-party library that support calls is important. In case of Amazon, we use aiobotocore for Async client for amazon services using botocore and aiohttp/asyncio.Inheriting the sync version of the operator wherever possible so boilerplate code can be avoided while keeping consistency. And then replacing the logic of the execute method.
Logging: Passing the Status of the task from Trigger to the Operator or Sensors so the logs show up in the Task Logs since Triggerer logs don’t make it to Task Logs
- At times the async implementation might require to call the synchronous function. We use asgiref
sync_to_async
function wrappers for this.sync_to_async
lets async code call a synchronous function, which is run in a threadpool and control returned to the async coroutine when the synchronous function completes. For example: async def service_file_as_context(self) -> Any: # noqa: D102 sync_hook = await self.get_sync_hook() return await sync_to_async(sync_hook.provide_gcp_credential_file_as_context)()
- At times the async implementation might require to call the synchronous function. We use asgiref
- While implementing trigger serialize method, its important to use the correct class name.
def serialize(self) -> Tuple[str, Dict[str, Any]]: """Serialize S3KeyTrigger arguments and classpath.""" return ( "astronomer.providers.amazon.aws.triggers.s3.S3KeyTrigger", { "bucket_name": self.bucket_name, "bucket_key": self.bucket_key, "wildcard_match": self.wildcard_match, "aws_conn_id": self.aws_conn_id, "hook_params": self.hook_params, }, )
Add the github issue-id as part of the PR request
Write unit tests which respect the code coverage toleration
Git commit messages aligned to open source standards
Rebase the code from
main
branch regularly.
- Debugging an example DAG
- Add Interpreter to PyCharm pointing interpreter path to
~/airflow-env/bin/python
, which is virtual environmentairflow-env
created earlier. For adding an Interpreter go toFile -> Setting -> Project: airflow -> Python Interpreter
. - In PyCharm IDE open the project, directory
/dev/dags
of local machine is by default mounted to docker machine when airflow is started. So any DAG file present in this directory will be picked automatically by scheduler running in docker machine and same can be seen onhttp://127.0.0.1:8080
. - Copy any example DAG that you would have developed to
/dev/dags/
. - Now this example DAG should be picked up by the local instance of Airflow.
All tests are inside ./tests
directory.
- Just run
pytest filepath+filename
to run the tests.
pytest tests/google/cloud/operators/test_bigquery.py
============================= test session starts ==============================
platform linux -- Python 3.9.10, pytest-7.0.1, pluggy-1.0.0
rootdir: /home/circleci/project, configfile: setup.cfg, testpaths: tests
plugins: anyio-3.5.0, asyncio-0.18.1
asyncio: mode=legacy
collected 6 items
tests/google/cloud/operators/test_bigquery.py ......
======================================== 6 passed in 4.88s ========================================