Skip to content

Commit

Permalink
fix: log action get the correct request body
Browse files Browse the repository at this point in the history
  • Loading branch information
luoyuliuyin committed Jan 10, 2025
1 parent fd31662 commit 8b77667
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 1 deletion.
2 changes: 1 addition & 1 deletion airflow/www/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def wrapper(*args, **kwargs):
user_display = get_auth_manager().get_user_display_name()

isAPIRequest = request.blueprint == "/api/v1"
hasJsonBody = request.headers.get("content-type") == "application/json" and request.json
hasJsonBody = "application/json" in request.headers.get("content-type") and request.json

fields_skip_logging = {
"csrf_token",
Expand Down
44 changes: 44 additions & 0 deletions tests/api_connexion/endpoints/test_dag_run_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -1522,6 +1522,50 @@ def test_should_respond_200(self, state, run_type, dag_maker, session):
assert response.status_code == 200
assert response.json == expected_response_json

@pytest.mark.parametrize("state", ["failed", "success", "queued"])
@pytest.mark.parametrize("run_type", [state.value for state in DagRunType])
def test_action_logging(self, state, run_type, dag_maker, session):
dag_id = "TEST_DAG_ID"
dag_run_id = "TEST_DAG_RUN_ID"
with dag_maker(dag_id) as dag:
task = EmptyOperator(task_id="task_id", dag=dag)
self.app.dag_bag.bag_dag(dag, root_dag=dag)
dr = dag_maker.create_dagrun(run_id=dag_run_id, run_type=run_type)
ti = dr.get_task_instance(task_id="task_id")
ti.task = task
ti.state = State.RUNNING
session.merge(ti)
session.commit()

request_json = {"state": state}

self.client.patch(
f"api/v1/dags/{dag_id}/dagRuns/{dag_run_id}",
json=request_json,
environ_overrides={"REMOTE_USER": "test"},
)

from airflow.models import Log
log = session.query(Log).filter(Log.dag_id == dag_id,
Log.run_id == dag_run_id,
Log.event == "api.update_dag_run_state", ).order_by(
Log.id.desc()).first()
import json
assert log.extra == json.dumps(request_json)

self.client.patch(
f"api/v1/dags/{dag_id}/dagRuns/{dag_run_id}",
json=request_json,
environ_overrides={"REMOTE_USER": "test"},
headers={"content-type": "application/json; charset=utf-8"},
)

log = session.query(Log).filter(Log.dag_id == dag_id,
Log.run_id == dag_run_id,
Log.event == "api.update_dag_run_state", ).order_by(
Log.id.desc()).first()
assert log.extra == json.dumps(request_json)

def test_schema_validation_error_raises(self, dag_maker, session):
dag_id = "TEST_DAG_ID"
dag_run_id = "TEST_DAG_RUN_ID"
Expand Down

0 comments on commit 8b77667

Please sign in to comment.