Skip to content

Commit

Permalink
AIP-84: Migrating GET queued asset events for DAG to fastAPI (apache#…
Browse files Browse the repository at this point in the history
…44124)

* AIP-84: Migrating GET queued asset events for DAG to fastAPI

* fixing tests and server code

* fixing parameters

* fixing parameters
  • Loading branch information
amoghrajesh authored Nov 18, 2024
1 parent 3f43bc4 commit 7b8bd28
Show file tree
Hide file tree
Showing 13 changed files with 502 additions and 18 deletions.
1 change: 1 addition & 0 deletions airflow/api_connexion/endpoints/asset_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ def delete_dag_asset_queued_event(
)


@mark_fastapi_migration_done
@security.requires_access_asset("GET")
@security.requires_access_dag("GET")
@provide_session
Expand Down
24 changes: 23 additions & 1 deletion airflow/api_fastapi/common/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

from abc import ABC, abstractmethod
from datetime import datetime
from typing import TYPE_CHECKING, Annotated, Any, Callable, Generic, List, Optional, TypeVar
from typing import TYPE_CHECKING, Annotated, Any, Callable, Generic, List, Optional, TypeVar, Union, overload

from fastapi import Depends, HTTPException, Query
from pendulum.parsing.exceptions import ParserError
Expand Down Expand Up @@ -409,6 +409,27 @@ def _safe_parse_datetime(date_to_check: str) -> datetime:
"""
if not date_to_check:
raise ValueError(f"{date_to_check} cannot be None.")
return _safe_parse_datetime_optional(date_to_check)


@overload
def _safe_parse_datetime_optional(date_to_check: str) -> datetime: ...


@overload
def _safe_parse_datetime_optional(date_to_check: None) -> None: ...


def _safe_parse_datetime_optional(date_to_check: str | None) -> datetime | None:
"""
Parse datetime and raise error for invalid dates.
Allow None values.
:param date_to_check: the string value to be parsed
"""
if date_to_check is None:
return None
try:
return timezone.parse(date_to_check, strict=True)
except (TypeError, ParserError):
Expand Down Expand Up @@ -615,6 +636,7 @@ def depends_float(

# Common Safe DateTime
DateTimeQuery = Annotated[str, AfterValidator(_safe_parse_datetime)]
OptionalDateTimeQuery = Annotated[Union[str, None], AfterValidator(_safe_parse_datetime_optional)]

# DAG
QueryLimit = Annotated[LimitFilter, Depends(LimitFilter().depends)]
Expand Down
15 changes: 15 additions & 0 deletions airflow/api_fastapi/core_api/datamodels/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,21 @@ class AssetEventCollectionResponse(BaseModel):
total_entries: int


class QueuedEventResponse(BaseModel):
"""Queued Event serializer for responses.."""

uri: str
dag_id: str
created_at: datetime


class QueuedEventCollectionResponse(BaseModel):
"""Queued Event Collection serializer for responses."""

queued_events: list[QueuedEventResponse]
total_entries: int


class CreateAssetEventsBody(BaseModel):
"""Create asset events request."""

Expand Down
90 changes: 89 additions & 1 deletion airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/assets/:
/public/assets:
get:
tags:
- Asset
Expand Down Expand Up @@ -434,6 +434,59 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/dags/{dag_id}/assets/queuedEvent:
get:
tags:
- Asset
summary: Get Dag Asset Queued Events
description: Get queued asset events for a DAG.
operationId: get_dag_asset_queued_events
parameters:
- name: dag_id
in: path
required: true
schema:
type: string
title: Dag Id
- name: before
in: query
required: false
schema:
anyOf:
- type: string
- type: 'null'
title: Before
responses:
'200':
description: Successful Response
content:
application/json:
schema:
$ref: '#/components/schemas/QueuedEventCollectionResponse'
'401':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unauthorized
'403':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Forbidden
'404':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Not Found
'422':
description: Validation Error
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/backfills/:
get:
tags:
Expand Down Expand Up @@ -5730,6 +5783,41 @@ components:
- version
title: ProviderResponse
description: Provider serializer for responses.
QueuedEventCollectionResponse:
properties:
queued_events:
items:
$ref: '#/components/schemas/QueuedEventResponse'
type: array
title: Queued Events
total_entries:
type: integer
title: Total Entries
type: object
required:
- queued_events
- total_entries
title: QueuedEventCollectionResponse
description: Queued Event Collection serializer for responses.
QueuedEventResponse:
properties:
uri:
type: string
title: Uri
dag_id:
type: string
title: Dag Id
created_at:
type: string
format: date-time
title: Created At
type: object
required:
- uri
- dag_id
- created_at
title: QueuedEventResponse
description: Queued Event serializer for responses..
ReprocessBehavior:
type: string
enum:
Expand Down
81 changes: 75 additions & 6 deletions airflow/api_fastapi/core_api/routes/public/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

from __future__ import annotations

from datetime import datetime
from typing import Annotated

from fastapi import Depends, HTTPException, status
Expand All @@ -25,6 +26,7 @@

from airflow.api_fastapi.common.db.common import get_session, paginated_select
from airflow.api_fastapi.common.parameters import (
OptionalDateTimeQuery,
QueryAssetDagIdPatternSearch,
QueryAssetIdFilter,
QueryLimit,
Expand All @@ -43,18 +45,41 @@
AssetEventResponse,
AssetResponse,
CreateAssetEventsBody,
QueuedEventCollectionResponse,
QueuedEventResponse,
)
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.assets import Asset
from airflow.assets.manager import asset_manager
from airflow.models.asset import AssetEvent, AssetModel
from airflow.models.asset import AssetDagRunQueue, AssetEvent, AssetModel
from airflow.utils import timezone

assets_router = AirflowRouter(tags=["Asset"], prefix="/assets")
assets_router = AirflowRouter(tags=["Asset"])


def _generate_queued_event_where_clause(
*,
dag_id: str | None = None,
uri: str | None = None,
before: datetime | str | None = None,
) -> list:
"""Get AssetDagRunQueue where clause."""
where_clause = []
if dag_id is not None:
where_clause.append(AssetDagRunQueue.target_dag_id == dag_id)
if uri is not None:
where_clause.append(
AssetDagRunQueue.asset_id.in_(
select(AssetModel.id).where(AssetModel.uri == uri),
),
)
if before is not None:
where_clause.append(AssetDagRunQueue.created_at < before)
return where_clause


@assets_router.get(
"/",
"/assets",
responses=create_openapi_http_exception_doc([401, 403, 404]),
)
def get_assets(
Expand Down Expand Up @@ -89,7 +114,7 @@ def get_assets(


@assets_router.get(
"/events",
"/assets/events",
responses=create_openapi_http_exception_doc([404]),
)
def get_asset_events(
Expand Down Expand Up @@ -139,7 +164,7 @@ def get_asset_events(


@assets_router.post(
"/events",
"/assets/events",
responses=create_openapi_http_exception_doc([404]),
)
def create_asset_event(
Expand All @@ -165,7 +190,7 @@ def create_asset_event(


@assets_router.get(
"/{uri:path}",
"/assets/{uri:path}",
responses=create_openapi_http_exception_doc([401, 403, 404]),
)
def get_asset(
Expand All @@ -183,3 +208,47 @@ def get_asset(
raise HTTPException(status.HTTP_404_NOT_FOUND, f"The Asset with uri: `{uri}` was not found")

return AssetResponse.model_validate(asset, from_attributes=True)


@assets_router.get(
"/dags/{dag_id}/assets/queuedEvent",
responses=create_openapi_http_exception_doc(
[
status.HTTP_404_NOT_FOUND,
]
),
)
def get_dag_asset_queued_events(
dag_id: str,
session: Annotated[Session, Depends(get_session)],
before: OptionalDateTimeQuery = None,
) -> QueuedEventCollectionResponse:
"""Get queued asset events for a DAG."""
where_clause = _generate_queued_event_where_clause(dag_id=dag_id, before=before)
query = (
select(AssetDagRunQueue, AssetModel.uri)
.join(AssetModel, AssetDagRunQueue.asset_id == AssetModel.id)
.where(*where_clause)
)

dag_asset_queued_events_select, total_entries = paginated_select(
query,
[],
)
adrqs = session.execute(dag_asset_queued_events_select).all()

if not adrqs:
raise HTTPException(status.HTTP_404_NOT_FOUND, f"Queue event with dag_id: `{dag_id}` was not found")

queued_events = [
QueuedEventResponse(created_at=adrq.created_at, dag_id=adrq.target_dag_id, uri=uri)
for adrq, uri in adrqs
]

return QueuedEventCollectionResponse(
queued_events=[
QueuedEventResponse.model_validate(queued_event, from_attributes=True)
for queued_event in queued_events
],
total_entries=total_entries,
)
22 changes: 22 additions & 0 deletions airflow/ui/openapi-gen/queries/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,28 @@ export const UseAssetServiceGetAssetKeyFn = (
},
queryKey?: Array<unknown>,
) => [useAssetServiceGetAssetKey, ...(queryKey ?? [{ uri }])];
export type AssetServiceGetDagAssetQueuedEventsDefaultResponse = Awaited<
ReturnType<typeof AssetService.getDagAssetQueuedEvents>
>;
export type AssetServiceGetDagAssetQueuedEventsQueryResult<
TData = AssetServiceGetDagAssetQueuedEventsDefaultResponse,
TError = unknown,
> = UseQueryResult<TData, TError>;
export const useAssetServiceGetDagAssetQueuedEventsKey =
"AssetServiceGetDagAssetQueuedEvents";
export const UseAssetServiceGetDagAssetQueuedEventsKeyFn = (
{
before,
dagId,
}: {
before?: string;
dagId: string;
},
queryKey?: Array<unknown>,
) => [
useAssetServiceGetDagAssetQueuedEventsKey,
...(queryKey ?? [{ before, dagId }]),
];
export type DashboardServiceHistoricalMetricsDefaultResponse = Awaited<
ReturnType<typeof DashboardService.historicalMetrics>
>;
Expand Down
26 changes: 26 additions & 0 deletions airflow/ui/openapi-gen/queries/prefetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,32 @@ export const prefetchUseAssetServiceGetAsset = (
queryKey: Common.UseAssetServiceGetAssetKeyFn({ uri }),
queryFn: () => AssetService.getAsset({ uri }),
});
/**
* Get Dag Asset Queued Events
* Get queued asset events for a DAG.
* @param data The data for the request.
* @param data.dagId
* @param data.before
* @returns QueuedEventCollectionResponse Successful Response
* @throws ApiError
*/
export const prefetchUseAssetServiceGetDagAssetQueuedEvents = (
queryClient: QueryClient,
{
before,
dagId,
}: {
before?: string;
dagId: string;
},
) =>
queryClient.prefetchQuery({
queryKey: Common.UseAssetServiceGetDagAssetQueuedEventsKeyFn({
before,
dagId,
}),
queryFn: () => AssetService.getDagAssetQueuedEvents({ before, dagId }),
});
/**
* Historical Metrics
* Return cluster activity historical metrics.
Expand Down
Loading

0 comments on commit 7b8bd28

Please sign in to comment.