Skip to content

Commit

Permalink
Merge branch 'apache:main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
LefterisXefteris authored Nov 8, 2024
2 parents ed4e607 + a212bf8 commit 6088b73
Show file tree
Hide file tree
Showing 73 changed files with 979 additions and 391 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ ARG PYTHON_BASE_IMAGE="python:3.9-slim-bookworm"
# Also use `force pip` label on your PR to swap all places we use `uv` to `pip`
ARG AIRFLOW_PIP_VERSION=24.3.1
# ARG AIRFLOW_PIP_VERSION="git+https://github.com/pypa/pip.git@main"
ARG AIRFLOW_UV_VERSION=0.4.30
ARG AIRFLOW_UV_VERSION=0.5.0
ARG AIRFLOW_USE_UV="false"
ARG UV_HTTP_TIMEOUT="300"
ARG AIRFLOW_IMAGE_REPOSITORY="https://github.com/apache/airflow"
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile.ci
Original file line number Diff line number Diff line change
Expand Up @@ -1373,7 +1373,7 @@ RUN bash /scripts/docker/install_packaging_tools.sh; \
# Also use `force pip` label on your PR to swap all places we use `uv` to `pip`
ARG AIRFLOW_PIP_VERSION=24.3.1
# ARG AIRFLOW_PIP_VERSION="git+https://github.com/pypa/pip.git@main"
ARG AIRFLOW_UV_VERSION=0.4.30
ARG AIRFLOW_UV_VERSION=0.5.0

ENV AIRFLOW_PIP_VERSION=${AIRFLOW_PIP_VERSION} \
AIRFLOW_UV_VERSION=${AIRFLOW_UV_VERSION}
Expand Down
15 changes: 15 additions & 0 deletions airflow/api_fastapi/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
from __future__ import annotations

import logging
from contextlib import AsyncExitStack, asynccontextmanager

from fastapi import FastAPI
from starlette.routing import Mount

from airflow.api_fastapi.core_api.app import init_config, init_dag_bag, init_plugins, init_views
from airflow.api_fastapi.execution_api.app import create_task_execution_api_app
Expand All @@ -28,6 +30,18 @@
app: FastAPI | None = None


@asynccontextmanager
async def lifespan(app: FastAPI):
async with AsyncExitStack() as stack:
for route in app.routes:
if isinstance(route, Mount) and isinstance(route.app, FastAPI):
await stack.enter_async_context(
route.app.router.lifespan_context(route.app),
)
app.state.lifespan_called = True
yield


def create_app(apps: str = "all") -> FastAPI:
apps_list = apps.split(",") if apps else ["all"]

Expand All @@ -36,6 +50,7 @@ def create_app(apps: str = "all") -> FastAPI:
description="Airflow API. All endpoints located under ``/public`` can be used safely, are stable and backward compatible. "
"Endpoints located under ``/ui`` are dedicated to the UI and are subject to breaking change "
"depending on the need of the frontend. Users should not rely on those but use the public ones instead.",
lifespan=lifespan,
)

if "core" in apps_list or "all" in apps_list:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ class DAGRunPatchStates(str, Enum):
class DAGRunPatchBody(BaseModel):
"""DAG Run Serializer for PATCH requests."""

state: DAGRunPatchStates
state: DAGRunPatchStates | None = None
note: str | None = Field(None, max_length=1000)


class DAGRunResponse(BaseModel):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@

from pydantic import AliasPath, BaseModel, BeforeValidator, ConfigDict, Field

from airflow.api_fastapi.core_api.serializers.job import JobResponse
from airflow.api_fastapi.core_api.serializers.trigger import TriggerResponse
from airflow.api_fastapi.core_api.datamodels.job import JobResponse
from airflow.api_fastapi.core_api.datamodels.trigger import TriggerResponse
from airflow.utils.state import TaskInstanceState


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@

from pydantic import BaseModel

from airflow.api_fastapi.core_api.serializers.dag_run import DAGRunResponse
from airflow.api_fastapi.core_api.serializers.dags import DAGResponse
from airflow.api_fastapi.core_api.datamodels.dag_run import DAGRunResponse
from airflow.api_fastapi.core_api.datamodels.dags import DAGResponse


class DAGWithLatestDagRunsResponse(DAGResponse):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class DAGRunStates(BaseModel):
failed: int


class TaskInstanceState(BaseModel):
class TaskInstanceStateCount(BaseModel):
"""TaskInstance serializer for responses."""

no_status: int
Expand All @@ -60,4 +60,4 @@ class HistoricalMetricDataResponse(BaseModel):

dag_run_types: DAGRunTypes
dag_run_states: DAGRunStates
task_instance_states: TaskInstanceState
task_instance_states: TaskInstanceStateCount
176 changes: 91 additions & 85 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1318,9 +1318,9 @@ paths:
patch:
tags:
- DagRun
summary: Patch Dag Run State
summary: Patch Dag Run
description: Modify a DAG Run.
operationId: patch_dag_run_state
operationId: patch_dag_run
parameters:
- name: dag_id
in: path
Expand Down Expand Up @@ -3694,10 +3694,16 @@ components:
DAGRunPatchBody:
properties:
state:
$ref: '#/components/schemas/DAGRunPatchStates'
anyOf:
- $ref: '#/components/schemas/DAGRunPatchStates'
- type: 'null'
note:
anyOf:
- type: string
maxLength: 1000
- type: 'null'
title: Note
type: object
required:
- state
title: DAGRunPatchBody
description: DAG Run Serializer for PATCH requests.
DAGRunPatchStates:
Expand Down Expand Up @@ -4330,7 +4336,7 @@ components:
dag_run_states:
$ref: '#/components/schemas/DAGRunStates'
task_instance_states:
$ref: '#/components/schemas/airflow__api_fastapi__core_api__serializers__ui__dashboard__TaskInstanceState'
$ref: '#/components/schemas/TaskInstanceStateCount'
type: object
required:
- dag_run_types
Expand Down Expand Up @@ -4792,7 +4798,7 @@ components:
title: Duration
state:
anyOf:
- $ref: '#/components/schemas/airflow__utils__state__TaskInstanceState'
- $ref: '#/components/schemas/TaskInstanceState'
- type: 'null'
try_number:
type: integer
Expand Down Expand Up @@ -4907,6 +4913,84 @@ components:
- triggerer_job
title: TaskInstanceResponse
description: TaskInstance serializer for responses.
TaskInstanceState:
type: string
enum:
- removed
- scheduled
- queued
- running
- success
- restarting
- failed
- up_for_retry
- up_for_reschedule
- upstream_failed
- skipped
- deferred
title: TaskInstanceState
description: 'All possible states that a Task Instance can be in.
Note that None is also allowed, so always use this in a type hint with Optional.'
TaskInstanceStateCount:
properties:
no_status:
type: integer
title: No Status
removed:
type: integer
title: Removed
scheduled:
type: integer
title: Scheduled
queued:
type: integer
title: Queued
running:
type: integer
title: Running
success:
type: integer
title: Success
restarting:
type: integer
title: Restarting
failed:
type: integer
title: Failed
up_for_retry:
type: integer
title: Up For Retry
up_for_reschedule:
type: integer
title: Up For Reschedule
upstream_failed:
type: integer
title: Upstream Failed
skipped:
type: integer
title: Skipped
deferred:
type: integer
title: Deferred
type: object
required:
- no_status
- removed
- scheduled
- queued
- running
- success
- restarting
- failed
- up_for_retry
- up_for_reschedule
- upstream_failed
- skipped
- deferred
title: TaskInstanceStateCount
description: TaskInstance serializer for responses.
TriggerResponse:
properties:
id:
Expand Down Expand Up @@ -5050,81 +5134,3 @@ components:
- git_version
title: VersionInfo
description: Version information serializer for responses.
airflow__api_fastapi__core_api__serializers__ui__dashboard__TaskInstanceState:
properties:
no_status:
type: integer
title: No Status
removed:
type: integer
title: Removed
scheduled:
type: integer
title: Scheduled
queued:
type: integer
title: Queued
running:
type: integer
title: Running
success:
type: integer
title: Success
restarting:
type: integer
title: Restarting
failed:
type: integer
title: Failed
up_for_retry:
type: integer
title: Up For Retry
up_for_reschedule:
type: integer
title: Up For Reschedule
upstream_failed:
type: integer
title: Upstream Failed
skipped:
type: integer
title: Skipped
deferred:
type: integer
title: Deferred
type: object
required:
- no_status
- removed
- scheduled
- queued
- running
- success
- restarting
- failed
- up_for_retry
- up_for_reschedule
- upstream_failed
- skipped
- deferred
title: TaskInstanceState
description: TaskInstance serializer for responses.
airflow__utils__state__TaskInstanceState:
type: string
enum:
- removed
- scheduled
- queued
- running
- success
- restarting
- failed
- up_for_retry
- up_for_reschedule
- upstream_failed
- skipped
- deferred
title: TaskInstanceState
description: 'All possible states that a Task Instance can be in.
Note that None is also allowed, so always use this in a type hint with Optional.'
20 changes: 10 additions & 10 deletions airflow/api_fastapi/core_api/routes/public/backfills.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@
from airflow.api_fastapi.common.db.common import get_session, paginated_select
from airflow.api_fastapi.common.parameters import QueryLimit, QueryOffset, SortParam
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.openapi.exceptions import (
create_openapi_http_exception_doc,
)
from airflow.api_fastapi.core_api.serializers.backfills import (
from airflow.api_fastapi.core_api.datamodels.backfills import (
BackfillCollectionResponse,
BackfillPostBody,
BackfillResponse,
)
from airflow.api_fastapi.core_api.openapi.exceptions import (
create_openapi_http_exception_doc,
)
from airflow.models import DagRun
from airflow.models.backfill import (
AlreadyRunningBackfill,
Expand All @@ -49,7 +49,7 @@
path="/",
responses=create_openapi_http_exception_doc([status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN]),
)
async def list_backfills(
def list_backfills(
dag_id: str,
limit: QueryLimit,
offset: QueryOffset,
Expand Down Expand Up @@ -81,7 +81,7 @@ async def list_backfills(
[status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN, status.HTTP_404_NOT_FOUND]
),
)
async def get_backfill(
def get_backfill(
backfill_id: str,
session: Annotated[Session, Depends(get_session)],
):
Expand All @@ -102,7 +102,7 @@ async def get_backfill(
]
),
)
async def pause_backfill(*, backfill_id, session: Annotated[Session, Depends(get_session)]):
def pause_backfill(*, backfill_id, session: Annotated[Session, Depends(get_session)]):
b = session.get(Backfill, backfill_id)
if not b:
raise HTTPException(status.HTTP_404_NOT_FOUND, f"Could not find backfill with id {backfill_id}")
Expand All @@ -125,7 +125,7 @@ async def pause_backfill(*, backfill_id, session: Annotated[Session, Depends(get
]
),
)
async def unpause_backfill(*, backfill_id, session: Annotated[Session, Depends(get_session)]):
def unpause_backfill(*, backfill_id, session: Annotated[Session, Depends(get_session)]):
b = session.get(Backfill, backfill_id)
if not b:
raise HTTPException(status.HTTP_404_NOT_FOUND, f"Could not find backfill with id {backfill_id}")
Expand All @@ -147,7 +147,7 @@ async def unpause_backfill(*, backfill_id, session: Annotated[Session, Depends(g
]
),
)
async def cancel_backfill(*, backfill_id, session: Annotated[Session, Depends(get_session)]):
def cancel_backfill(*, backfill_id, session: Annotated[Session, Depends(get_session)]):
b: Backfill = session.get(Backfill, backfill_id)
if not b:
raise HTTPException(status.HTTP_404_NOT_FOUND, f"Could not find backfill with id {backfill_id}")
Expand Down Expand Up @@ -194,7 +194,7 @@ async def cancel_backfill(*, backfill_id, session: Annotated[Session, Depends(ge
]
),
)
async def create_backfill(
def create_backfill(
backfill_request: BackfillPostBody,
):
from_date = timezone.coerce_datetime(backfill_request.from_date)
Expand Down
Loading

0 comments on commit 6088b73

Please sign in to comment.