Skip to content

Commit 052d27f

Browse files
authored
Fix DetachedInstanceError in OpenLineage listener by using safe_getattr (#65579)
1 parent 9aa4dfb commit 052d27f

1 file changed

Lines changed: 15 additions & 5 deletions

File tree

  • providers/openlineage/src/airflow/providers/openlineage/utils

providers/openlineage/src/airflow/providers/openlineage/utils/utils.py

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
from openlineage.client.facet_v2 import job_dependencies_run, parent_run
3131
from openlineage.client.utils import RedactMixin
3232
from openlineage.client.uuid import generate_static_uuid
33+
from sqlalchemy.orm.exc import DetachedInstanceError
3334

3435
from airflow import __version__ as AIRFLOW_VERSION
3536
from airflow.models import DagRun, TaskInstance, TaskReschedule
@@ -576,6 +577,14 @@ def is_ti_rescheduled_already(ti: TaskInstance, session=NEW_SESSION):
576577
)
577578

578579

580+
def safe_getattr(obj: Any, attr: str, default: Any = None) -> Any:
581+
"""Get attribute from object, returning default if DetachedInstanceError is raised."""
582+
try:
583+
return getattr(obj, attr, default)
584+
except DetachedInstanceError:
585+
return default
586+
587+
579588
class InfoJsonEncodable(dict):
580589
"""
581590
Airflow objects might not be json-encodable overall.
@@ -755,7 +764,7 @@ class DagRunInfo(InfoJsonEncodable):
755764
]
756765

757766
casts = {
758-
"note": lambda dagrun: getattr(dagrun, "note", None) if AIRFLOW_V_3_2_PLUS else None,
767+
"note": lambda dagrun: safe_getattr(dagrun, "note") if AIRFLOW_V_3_2_PLUS else None,
759768
"duration": lambda dagrun: DagRunInfo.duration(dagrun),
760769
"dag_bundle_name": lambda dagrun: DagRunInfo.dag_version_info(dagrun, "bundle_name"),
761770
"dag_bundle_version": lambda dagrun: DagRunInfo.dag_version_info(dagrun, "bundle_version"),
@@ -781,7 +790,7 @@ def deadlines(cls, dagrun: DagRun) -> dict[str, Any] | None:
781790
"""
782791
try:
783792
# AF2 DagRun and AF3 DagRun SDK model (on worker) do not have this information
784-
deadlines = getattr(dagrun, "deadlines", None)
793+
deadlines = safe_getattr(dagrun, "deadlines")
785794
if not deadlines:
786795
return None
787796
except Exception as err:
@@ -800,7 +809,7 @@ def deadlines(cls, dagrun: DagRun) -> dict[str, Any] | None:
800809
# deadline_alert is a lazy-loaded ORM relationship that may
801810
# trigger a DB query; keep it isolated so a detached-session
802811
# error doesn't discard the rest of the deadline info.
803-
if alert := getattr(d, "deadline_alert", None):
812+
if alert := safe_getattr(d, "deadline_alert"):
804813
info.update(
805814
{
806815
k: v
@@ -820,9 +829,10 @@ def deadlines(cls, dagrun: DagRun) -> dict[str, Any] | None:
820829
def dag_version_info(cls, dagrun: DagRun, key: str) -> str | int | None:
821830
"""Extract deg version info for given key, sourced from DagRun (on scheduler)."""
822831
# AF2 DagRun and AF3 DagRun SDK model (on worker) do not have this information
823-
if not getattr(dagrun, "dag_versions", []):
832+
dag_versions = safe_getattr(dagrun, "dag_versions", [])
833+
if not dag_versions:
824834
return None
825-
current_version = dagrun.dag_versions[-1]
835+
current_version = dag_versions[-1]
826836
if key == "bundle_name":
827837
return current_version.bundle_name
828838
if key == "bundle_version":

0 commit comments

Comments
 (0)