Skip to content

Commit

Permalink
Add basic asyncio support (apache#43944)
Browse files Browse the repository at this point in the history
Add the ability to get an async sqlalchemy session for all 3 dialects.

This is meant to be sort of a hello world for asyncio support in airflow.  It will be refined and extended in the future.  But I think airflow ultimately really needs to go in this direction: in the new REST API, in the new AIP-72 internal API server, in triggers, and ultimately, in the scheduler.

---------

Co-authored-by: Kaxil Naik <[email protected]>
  • Loading branch information
dstandish and kaxil authored Nov 14, 2024
1 parent ab529d1 commit 0393c1f
Show file tree
Hide file tree
Showing 9 changed files with 67 additions and 6 deletions.
17 changes: 13 additions & 4 deletions .github/workflows/basic-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -196,10 +196,19 @@ jobs:
working-directory: ./clients/python
- name: "Install source version of required packages"
run: |
breeze release-management prepare-provider-packages fab standard common.sql --package-format \
wheel --skip-tag-check --version-suffix-for-pypi dev0
pip install . dist/apache_airflow_providers_fab-*.whl \
dist/apache_airflow_providers_standard-*.whl dist/apache_airflow_providers_common_sql-*.whl
breeze release-management prepare-provider-packages \
fab \
standard \
common.sql \
sqlite \
--package-format wheel \
--skip-tag-check \
--version-suffix-for-pypi dev0
pip install . \
dist/apache_airflow_providers_fab-*.whl \
dist/apache_airflow_providers_standard-*.whl \
dist/apache_airflow_providers_common_sql-*.whl \
dist/apache_airflow_providers_sqlite-*.whl
breeze release-management prepare-task-sdk-package --package-format wheel
pip install ./dist/apache_airflow_task_sdk-*.whl
- name: "Install Python client"
Expand Down
34 changes: 33 additions & 1 deletion airflow/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import pluggy
from packaging.version import Version
from sqlalchemy import create_engine, exc, text
from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, create_async_engine
from sqlalchemy.orm import scoped_session, sessionmaker
from sqlalchemy.pool import NullPool

Expand Down Expand Up @@ -95,8 +96,17 @@
DONOT_MODIFY_HANDLERS: bool | None = None
DAGS_FOLDER: str = os.path.expanduser(conf.get_mandatory_value("core", "DAGS_FOLDER"))

AIO_LIBS_MAPPING = {"sqlite": "aiosqlite", "postgresql": "asyncpg", "mysql": "aiomysql"}
"""
Mapping of sync scheme to async scheme.
:meta private:
"""

engine: Engine
Session: Callable[..., SASession]
async_engine: AsyncEngine
create_async_session: Callable[..., AsyncSession]

# The JSON library to use for DAG Serialization and De-Serialization
json = json
Expand Down Expand Up @@ -199,13 +209,25 @@ def load_policy_plugins(pm: pluggy.PluginManager):
pm.load_setuptools_entrypoints("airflow.policy")


def _get_async_conn_uri_from_sync(sync_uri):
scheme, rest = sync_uri.split(":", maxsplit=1)
scheme = scheme.split("+", maxsplit=1)[0]
aiolib = AIO_LIBS_MAPPING.get(scheme)
if aiolib:
return f"{scheme}+{aiolib}:{rest}"
else:
return sync_uri


def configure_vars():
"""Configure Global Variables from airflow.cfg."""
global SQL_ALCHEMY_CONN
global SQL_ALCHEMY_CONN_ASYNC
global DAGS_FOLDER
global PLUGINS_FOLDER
global DONOT_MODIFY_HANDLERS
SQL_ALCHEMY_CONN = conf.get("database", "SQL_ALCHEMY_CONN")
SQL_ALCHEMY_CONN_ASYNC = _get_async_conn_uri_from_sync(sync_uri=SQL_ALCHEMY_CONN)

DAGS_FOLDER = os.path.expanduser(conf.get("core", "DAGS_FOLDER"))

Expand Down Expand Up @@ -441,6 +463,9 @@ def configure_orm(disable_connection_pool=False, pool_class=None):

global Session
global engine
global async_engine
global create_async_session

if os.environ.get("_AIRFLOW_SKIP_DB_TESTS") == "true":
# Skip DB initialization in unit tests, if DB tests are skipped
Session = SkipDBTestsSession
Expand All @@ -466,7 +491,14 @@ def configure_orm(disable_connection_pool=False, pool_class=None):
connect_args["check_same_thread"] = False

engine = create_engine(SQL_ALCHEMY_CONN, connect_args=connect_args, **engine_args, future=True)

async_engine = create_async_engine(SQL_ALCHEMY_CONN_ASYNC, future=True)
create_async_session = sessionmaker(
bind=async_engine,
autocommit=False,
autoflush=False,
class_=AsyncSession,
expire_on_commit=False,
)
mask_secret(engine.url.password)

setup_event_handlers(engine)
Expand Down
2 changes: 2 additions & 0 deletions dev/breeze/tests/test_packages.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ def test_get_documentation_package_path():
"""
"apache-airflow-providers-common-sql>=1.20.0b0",
"apache-airflow>=2.8.0b0",
"asyncpg>=0.30.0",
"psycopg2-binary>=2.9.4",
""",
id="beta0 suffix postgres",
Expand All @@ -221,6 +222,7 @@ def test_get_documentation_package_path():
"""
"apache-airflow-providers-common-sql>=1.20.0",
"apache-airflow>=2.8.0",
"asyncpg>=0.30.0",
"psycopg2-binary>=2.9.4",
""",
id="No suffix postgres",
Expand Down
3 changes: 3 additions & 0 deletions generated/provider_dependencies.json
Original file line number Diff line number Diff line change
Expand Up @@ -906,6 +906,7 @@
},
"mysql": {
"deps": [
"aiomysql>=0.2.0",
"apache-airflow-providers-common-sql>=1.20.0",
"apache-airflow>=2.8.0",
"mysql-connector-python>=8.0.29",
Expand Down Expand Up @@ -1085,6 +1086,7 @@
"deps": [
"apache-airflow-providers-common-sql>=1.20.0",
"apache-airflow>=2.8.0",
"asyncpg>=0.30.0",
"psycopg2-binary>=2.9.4"
],
"devel-deps": [],
Expand Down Expand Up @@ -1260,6 +1262,7 @@
},
"sqlite": {
"deps": [
"aiosqlite>=0.20.0",
"apache-airflow-providers-common-sql>=1.20.0",
"apache-airflow>=2.8.0"
],
Expand Down
1 change: 1 addition & 0 deletions providers/src/airflow/providers/mysql/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ dependencies:
# Instead, if someone attempts to use it on MacOS, they will get explanatory error on how to install it
- mysqlclient>=1.4.0; sys_platform != 'darwin'
- mysql-connector-python>=8.0.29
- aiomysql>=0.2.0

additional-extras:
# only needed for backwards compatibility
Expand Down
1 change: 1 addition & 0 deletions providers/src/airflow/providers/postgres/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ dependencies:
- apache-airflow>=2.8.0
- apache-airflow-providers-common-sql>=1.20.0
- psycopg2-binary>=2.9.4
- asyncpg>=0.30.0

additional-extras:
- name: amazon
Expand Down
1 change: 1 addition & 0 deletions providers/src/airflow/providers/sqlite/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ versions:

dependencies:
- apache-airflow>=2.8.0
- aiosqlite>=0.20.0
- apache-airflow-providers-common-sql>=1.20.0

integrations:
Expand Down
2 changes: 1 addition & 1 deletion scripts/ci/kubernetes/k8s_requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
-e .[devel-devscripts,devel-tests,cncf.kubernetes]
-e .[devel-devscripts,devel-tests,cncf.kubernetes,sqlite]
-e ./providers
-e ./task_sdk
12 changes: 12 additions & 0 deletions tests/utils/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
from __future__ import annotations

import pytest
from sqlalchemy import select

from airflow.models import Log
from airflow.utils.session import provide_session

pytestmark = [pytest.mark.db_test, pytest.mark.skip_if_database_isolation_mode]
Expand Down Expand Up @@ -53,3 +55,13 @@ def test_provide_session_with_kwargs(self):

session = object()
assert wrapper(session=session) is session

@pytest.mark.asyncio
async def test_async_session(self):
from airflow.settings import create_async_session

session = create_async_session()
session.add(Log(event="hihi1234"))
await session.commit()
l = await session.scalar(select(Log).where(Log.event == "hihi1234").limit(1)) # noqa: E741
assert l.event == "hihi1234"

0 comments on commit 0393c1f

Please sign in to comment.