diff --git a/airflow-core/src/airflow/triggers/base.py b/airflow-core/src/airflow/triggers/base.py index 7ca7ed20a7463..205e830c365aa 100644 --- a/airflow-core/src/airflow/triggers/base.py +++ b/airflow-core/src/airflow/triggers/base.py @@ -109,8 +109,26 @@ def task_instance(self, value: TaskInstance | None) -> None: if self.task_instance: self.task_id = self.task_instance.task_id if self.task: - self.template_fields = self.task.template_fields self.template_ext = self.task.template_ext + # Only keep operator template_fields that are also keys in + # start_trigger_args.trigger_kwargs *and* exist on the trigger. + # Using the full operator template_fields would cause + # AttributeError when the trigger does not have attributes with + # the same names as the operator (e.g. "bash_command"). + # + # When start_trigger_args is None (normal defer path), the triggerer + # does not build a template context, so render_template_fields is + # never called and empty template_fields is safe. + start_trigger_args = getattr(self.task, "start_trigger_args", None) + trigger_kwarg_keys = ( + set((start_trigger_args.trigger_kwargs or {}).keys()) if start_trigger_args else set() + ) + if trigger_kwarg_keys: + self.template_fields = tuple( + f for f in self.task.template_fields if f in trigger_kwarg_keys and hasattr(self, f) + ) + else: + self.template_fields = () def render_template_fields( self, @@ -127,7 +145,8 @@ def render_template_fields( """ if not jinja_env: jinja_env = self.get_template_env() - # We only need to render templated fields if templated fields are part of the start_trigger_args + # self.template_fields is already filtered (in the task_instance setter) to only + # include fields present in start_trigger_args.trigger_kwargs and on this trigger. self._do_render_template_fields(self, self.template_fields, context, jinja_env, set()) @abc.abstractmethod diff --git a/airflow-core/tests/unit/triggers/test_base_trigger.py b/airflow-core/tests/unit/triggers/test_base_trigger.py index 53066c46f6a14..d9e38385a2052 100644 --- a/airflow-core/tests/unit/triggers/test_base_trigger.py +++ b/airflow-core/tests/unit/triggers/test_base_trigger.py @@ -27,6 +27,18 @@ class DummyOperator(BaseOperator): template_fields = ("name",) +class OperatorWithExtraTemplateFields(BaseOperator): + """Operator whose template_fields do NOT all exist on the trigger.""" + + template_fields = ("bash_command", "env", "name") + + def __init__(self, bash_command="", env=None, name="", **kwargs): + super().__init__(**kwargs) + self.bash_command = bash_command + self.env = env + self.name = name + + class DummyTrigger(BaseTrigger): def __init__(self, name: str, **kwargs): super().__init__(**kwargs) @@ -67,3 +79,62 @@ def test_render_template_fields(create_task_instance): trigger.render_template_fields(context={"name": "world"}) assert trigger.name == "Hello world" + + +@pytest.mark.db_test +def test_render_template_fields_filters_to_trigger_kwargs(create_task_instance): + """Only fields present in both trigger_kwargs and on the trigger should be rendered. + + Operator template_fields like 'bash_command' and 'env' that don't exist on the + trigger must be excluded to avoid AttributeError. + """ + op = OperatorWithExtraTemplateFields( + task_id="extra_fields_task", + bash_command="echo hello", + env={"KEY": "val"}, + name="static", + ) + ti = create_task_instance( + task=op, + start_from_trigger=True, + start_trigger_args=StartTriggerArgs( + trigger_cls=f"{DummyTrigger.__module__}.{DummyTrigger.__qualname__}", + next_method="resume_method", + trigger_kwargs={"name": "Hello {{ name }}"}, + ), + ) + + trigger = DummyTrigger(name="Hello {{ name }}") + trigger.task_instance = ti + + # Only 'name' should be in template_fields; 'bash_command' and 'env' are excluded + # because they aren't keys in trigger_kwargs or don't exist on the trigger. + assert trigger.template_fields == ("name",) + + # Rendering must not raise AttributeError for missing operator fields + trigger.render_template_fields(context={"name": "world"}) + assert trigger.name == "Hello world" + + +@pytest.mark.db_test +def test_render_template_fields_empty_when_no_trigger_kwargs(create_task_instance): + """When start_trigger_args has no trigger_kwargs, template_fields should be empty.""" + op = DummyOperator(task_id="no_kwargs_task") + ti = create_task_instance( + task=op, + start_from_trigger=True, + start_trigger_args=StartTriggerArgs( + trigger_cls=f"{DummyTrigger.__module__}.{DummyTrigger.__qualname__}", + next_method="resume_method", + trigger_kwargs=None, + ), + ) + + trigger = DummyTrigger(name="Hello {{ name }}") + trigger.task_instance = ti + + assert trigger.template_fields == () + + # Rendering with empty template_fields is a no-op + trigger.render_template_fields(context={"name": "world"}) + assert trigger.name == "Hello {{ name }}"