diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py b/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py index 448d0b6945abf..084dbae8b380f 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py @@ -32,9 +32,12 @@ from airflow.api_fastapi.compat import HTTP_422_UNPROCESSABLE_CONTENT from airflow.api_fastapi.execution_api.datamodels.dagrun import DagRunStateResponse, TriggerDAGRunPayload from airflow.api_fastapi.execution_api.datamodels.taskinstance import DagRun +from airflow.api_fastapi.execution_api.datamodels.token import TIToken +from airflow.api_fastapi.execution_api.security import CurrentTIToken from airflow.exceptions import DagRunAlreadyExists from airflow.models.dag import DagModel from airflow.models.dagrun import DagRun as DagRunModel +from airflow.models.taskinstance import TaskInstance from airflow.utils.state import DagRunState from airflow.utils.types import DagRunTriggeredByType, DagRunType @@ -94,6 +97,7 @@ def trigger_dag_run( run_id: str, payload: TriggerDAGRunPayload, session: SessionDep, + token: TIToken = CurrentTIToken, ) -> None: """Trigger a Dag run.""" dm = session.scalar(select(DagModel).where(~DagModel.is_stale, DagModel.dag_id == dag_id).limit(1)) @@ -121,6 +125,11 @@ def trigger_dag_run( }, ) + # Inherit triggering_user_name from the calling task's DagRun so chains of + # TriggerDagRunOperator preserve the original human user across child runs. + parent_ti = session.get(TaskInstance, token.id) + triggering_user_name = parent_ti.dag_run.triggering_user_name if parent_ti else None + try: trigger_dag( dag_id=dag_id, @@ -129,6 +138,7 @@ def trigger_dag_run( conf=payload.conf, logical_date=payload.logical_date, triggered_by=DagRunTriggeredByType.OPERATOR, + triggering_user_name=triggering_user_name, replace_microseconds=False, partition_key=payload.partition_key, note=payload.note, diff --git a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py index b84b993f1d63d..337bc1c90b8d5 100644 --- a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py +++ b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py @@ -19,9 +19,12 @@ import pytest import time_machine +from fastapi import Request from sqlalchemy import select, update from airflow._shared.timezones import timezone +from airflow.api_fastapi.execution_api.datamodels.token import TIClaims, TIToken +from airflow.api_fastapi.execution_api.security import require_auth from airflow.models import DagModel from airflow.models.dagrun import DagRun from airflow.providers.standard.operators.empty import EmptyOperator @@ -219,6 +222,44 @@ def test_trigger_dag_run_already_exists(self, client, session, dag_maker): } } + @pytest.mark.parametrize("parent_triggering_user_name", ["alice", None]) + def test_trigger_dag_run_inherits_triggering_user_name( + self, client, exec_app, session, dag_maker, parent_triggering_user_name + ): + """Child DAG run inherits triggering_user_name from the calling task's parent run.""" + parent_dag_id = "parent_dag_inherits" + parent_run_id = "parent_run" + child_dag_id = "child_dag_inherits" + child_run_id = "child_run" + logical_date = timezone.datetime(2025, 2, 20) + + with dag_maker(dag_id=parent_dag_id, session=session, serialized=True): + EmptyOperator(task_id="trigger_task") + parent_run = dag_maker.create_dagrun( + run_id=parent_run_id, triggering_user_name=parent_triggering_user_name + ) + parent_ti = parent_run.task_instances[0] + + with dag_maker(dag_id=child_dag_id, session=session, serialized=True): + EmptyOperator(task_id="child_task") + session.commit() + + async def auth_as_parent_ti(request: Request) -> TIToken: + return TIToken(id=parent_ti.id, claims=TIClaims(scope="execution")) + + exec_app.dependency_overrides[require_auth] = auth_as_parent_ti + try: + response = client.post( + f"/execution/dag-runs/{child_dag_id}/{child_run_id}", + json={"logical_date": logical_date.isoformat()}, + ) + finally: + exec_app.dependency_overrides.pop(require_auth, None) + + assert response.status_code == 204 + child_run = session.scalars(select(DagRun).where(DagRun.run_id == child_run_id)).one() + assert child_run.triggering_user_name == parent_triggering_user_name + class TestDagRunClear: def setup_method(self):