@@ -183,11 +183,9 @@ async def _process_pod_event(self, event: PodEvent) -> None:
183183 # Map to application events
184184 app_events = await self ._event_mapper .map_pod_event (event .pod , event .event_type )
185185
186- # Publish events
187186 for app_event in app_events :
188187 await self ._publish_event (app_event , event .pod )
189188
190- # Delete pod once all data has been extracted and terminal events published
191189 if any (e .event_type in _TERMINAL_EVENT_TYPES for e in app_events ):
192190 await self ._delete_pod (event .pod )
193191
@@ -207,22 +205,18 @@ async def _process_pod_event(self, event: PodEvent) -> None:
207205
208206 async def _publish_event (self , event : DomainEvent , pod : k8s_client .V1Pod ) -> None :
209207 """Publish event to Kafka and store in events collection."""
210- try :
211- execution_id = getattr (event , "execution_id" , None ) or event .aggregate_id
212- key = str (execution_id or (pod .metadata .name if pod .metadata else "unknown" ))
213-
214- await self ._kafka_event_service .publish_event (event = event , key = key )
208+ execution_id = getattr (event , "execution_id" , None ) or event .aggregate_id
209+ key = str (execution_id or (pod .metadata .name if pod .metadata else "unknown" ))
215210
216- phase = pod .status .phase if pod .status else "Unknown"
217- self ._metrics .record_pod_monitor_event_published (event .event_type , phase )
211+ await self ._kafka_event_service .publish_event (event = event , key = key )
218212
219- except Exception as e :
220- self .logger . error ( f"Error publishing event: { e } " , exc_info = True )
213+ phase = pod . status . phase if pod . status else "Unknown"
214+ self ._metrics . record_pod_monitor_event_published ( event . event_type , phase )
221215
222216 async def _delete_pod (self , pod : k8s_client .V1Pod ) -> None :
223217 """Delete a pod after its data has been fully extracted.
224218
225- Frees the ResourceQuota slot so new executor pods can be scheduled .
219+ Frees the Kueue quota slot so gated executor pods can be admitted .
226220 The ConfigMap is garbage-collected automatically via ownerReference.
227221 """
228222 pod_name = pod .metadata .name
0 commit comments