|
6 | 6 | import structlog |
7 | 7 | from kubernetes_asyncio import client as k8s_client |
8 | 8 | from kubernetes_asyncio import watch as k8s_watch |
| 9 | +from kubernetes_asyncio.client.rest import ApiException |
9 | 10 |
|
10 | 11 | from app.core.metrics import KubernetesMetrics |
11 | 12 | from app.core.utils import StringEnum |
| 13 | +from app.domain.enums import EventType |
12 | 14 | from app.domain.events import DomainEvent |
13 | 15 | from app.services.kafka_event_service import KafkaEventService |
14 | 16 | from app.services.pod_monitor.config import PodMonitorConfig |
15 | 17 | from app.services.pod_monitor.event_mapper import PodEventMapper, WatchEventType |
16 | 18 |
|
| 19 | +_TERMINAL_EVENT_TYPES: frozenset[str] = frozenset({ |
| 20 | + EventType.EXECUTION_COMPLETED, |
| 21 | + EventType.EXECUTION_FAILED, |
| 22 | + EventType.EXECUTION_TIMEOUT, |
| 23 | +}) |
| 24 | + |
17 | 25 | # Type aliases |
18 | 26 | type ResourceVersion = str |
19 | 27 | type KubeEvent = dict[str, Any] |
@@ -179,6 +187,10 @@ async def _process_pod_event(self, event: PodEvent) -> None: |
179 | 187 | for app_event in app_events: |
180 | 188 | await self._publish_event(app_event, event.pod) |
181 | 189 |
|
| 190 | + # Delete pod once all data has been extracted and terminal events published |
| 191 | + if any(e.event_type in _TERMINAL_EVENT_TYPES for e in app_events): |
| 192 | + await self._delete_pod(event.pod) |
| 193 | + |
182 | 194 | if app_events: |
183 | 195 | self.logger.info( |
184 | 196 | f"Processed {event.event_type} event for pod {pod_name} " |
@@ -206,3 +218,21 @@ async def _publish_event(self, event: DomainEvent, pod: k8s_client.V1Pod) -> Non |
206 | 218 |
|
207 | 219 | except Exception as e: |
208 | 220 | self.logger.error(f"Error publishing event: {e}", exc_info=True) |
| 221 | + |
| 222 | + async def _delete_pod(self, pod: k8s_client.V1Pod) -> None: |
| 223 | + """Delete a pod after its data has been fully extracted. |
| 224 | +
|
| 225 | + Frees the ResourceQuota slot so new executor pods can be scheduled. |
| 226 | + The ConfigMap is garbage-collected automatically via ownerReference. |
| 227 | + """ |
| 228 | + pod_name = pod.metadata.name |
| 229 | + try: |
| 230 | + await self._v1.delete_namespaced_pod( |
| 231 | + name=pod_name, namespace=pod.metadata.namespace, grace_period_seconds=0, |
| 232 | + ) |
| 233 | + self.logger.info(f"Deleted completed pod {pod_name}") |
| 234 | + except ApiException as e: |
| 235 | + if e.status == 404: |
| 236 | + self.logger.debug(f"Pod {pod_name} already deleted") |
| 237 | + else: |
| 238 | + self.logger.warning(f"Failed to delete pod {pod_name}: {e.reason}") |
0 commit comments