11import asyncio
22import time
33from pathlib import Path
4- from typing import Any
54
65import structlog
76from kubernetes_asyncio import client as k8s_client
@@ -267,7 +266,7 @@ async def ensure_namespace_security(self) -> None:
267266 await self ._apply_psa_labels (namespace )
268267
269268 async def _ensure_executor_network_policy (self , namespace : str ) -> None :
270- """Create default-deny NetworkPolicy for executor pods."""
269+ """Create or update default-deny NetworkPolicy for executor pods."""
271270 policy_name = "executor-deny-all"
272271
273272 policy = k8s_client .V1NetworkPolicy (
@@ -279,31 +278,24 @@ async def _ensure_executor_network_policy(self, namespace: str) -> None:
279278 labels = {"app" : "integr8s" , "component" : "security" },
280279 ),
281280 spec = k8s_client .V1NetworkPolicySpec (
282- pod_selector = k8s_client .V1LabelSelector (
283- match_labels = {"component" : "executor" },
284- ),
281+ pod_selector = k8s_client .V1LabelSelector (match_labels = {"component" : "executor" }),
285282 policy_types = ["Ingress" , "Egress" ],
286283 ingress = [],
287284 egress = [],
288285 ),
289286 )
290287
291- try :
292- await self .networking_v1 .read_namespaced_network_policy (name = policy_name , namespace = namespace )
293- await self .networking_v1 .replace_namespaced_network_policy (
294- name = policy_name , namespace = namespace , body = policy ,
295- )
296- self .logger .info (f"NetworkPolicy '{ policy_name } ' updated in namespace { namespace } " )
297- except ApiException as e :
298- if e .status == 404 :
299- await self .networking_v1 .create_namespaced_network_policy (namespace = namespace , body = policy )
300- self .logger .info (f"NetworkPolicy '{ policy_name } ' created in namespace { namespace } " )
301- else :
302- self .logger .error (f"Failed to apply NetworkPolicy '{ policy_name } ': { e .reason } " )
288+ await self .networking_v1 .patch_namespaced_network_policy ( # type: ignore[call-arg]
289+ name = policy_name , namespace = namespace , body = policy ,
290+ field_manager = "integr8s" , force = True ,
291+ _content_type = "application/apply-patch+yaml" ,
292+ )
293+ self .logger .info (f"NetworkPolicy '{ policy_name } ' applied in namespace { namespace } " )
303294
304295 async def _ensure_executor_resource_quota (self , namespace : str ) -> None :
305- """Create ResourceQuota to cap aggregate executor pod consumption."""
296+ """Create or update ResourceQuota to cap aggregate executor pod consumption."""
306297 quota_name = "executor-quota"
298+ n = self ._settings .K8S_MAX_CONCURRENT_PODS
307299
308300 quota = k8s_client .V1ResourceQuota (
309301 api_version = "v1" ,
@@ -315,25 +307,21 @@ async def _ensure_executor_resource_quota(self, namespace: str) -> None:
315307 ),
316308 spec = k8s_client .V1ResourceQuotaSpec (
317309 hard = {
318- "pods" : str (self . _settings . K8S_MAX_CONCURRENT_PODS ),
319- "requests.cpu" : f"{ self ._settings .K8S_MAX_CONCURRENT_PODS } " ,
320- "requests.memory" : f"{ self ._settings .K8S_MAX_CONCURRENT_PODS * 128 } Mi" ,
321- "limits.cpu" : f"{ self ._settings .K8S_MAX_CONCURRENT_PODS } " ,
322- "limits.memory" : f"{ self ._settings .K8S_MAX_CONCURRENT_PODS * 128 } Mi" ,
310+ "pods" : str (n ),
311+ "requests.cpu" : f"{ int ( self ._settings .K8S_POD_CPU_REQUEST . removesuffix ( 'm' )) * n } m " ,
312+ "requests.memory" : f"{ int ( self ._settings .K8S_POD_MEMORY_REQUEST . removesuffix ( 'Mi' )) * n } Mi" ,
313+ "limits.cpu" : f"{ int ( self ._settings .K8S_POD_CPU_LIMIT . removesuffix ( 'm' )) * n } m " ,
314+ "limits.memory" : f"{ int ( self ._settings .K8S_POD_MEMORY_LIMIT . removesuffix ( 'Mi' )) * n } Mi" ,
323315 },
324316 ),
325317 )
326318
327- try :
328- await self .v1 .read_namespaced_resource_quota (name = quota_name , namespace = namespace )
329- await self .v1 .replace_namespaced_resource_quota (name = quota_name , namespace = namespace , body = quota )
330- self .logger .info (f"ResourceQuota '{ quota_name } ' updated in namespace { namespace } " )
331- except ApiException as e :
332- if e .status == 404 :
333- await self .v1 .create_namespaced_resource_quota (namespace = namespace , body = quota )
334- self .logger .info (f"ResourceQuota '{ quota_name } ' created in namespace { namespace } " )
335- else :
336- self .logger .error (f"Failed to apply ResourceQuota '{ quota_name } ': { e .reason } " )
319+ await self .v1 .patch_namespaced_resource_quota ( # type: ignore[call-arg]
320+ name = quota_name , namespace = namespace , body = quota ,
321+ field_manager = "integr8s" , force = True ,
322+ _content_type = "application/apply-patch+yaml" ,
323+ )
324+ self .logger .info (f"ResourceQuota '{ quota_name } ' applied in namespace { namespace } " )
337325
338326 async def _apply_psa_labels (self , namespace : str ) -> None :
339327 """Apply Pod Security Admission labels to the executor namespace."""
@@ -344,69 +332,54 @@ async def _apply_psa_labels(self, namespace: str) -> None:
344332 "pod-security.kubernetes.io/audit" : "restricted" ,
345333 }
346334
347- patch_body = {"metadata" : {"labels" : psa_labels }}
348- try :
349- await self .v1 .patch_namespace (name = namespace , body = patch_body )
350- self .logger .info (f"Pod Security Admission labels applied to namespace { namespace } " )
351- except ApiException as e :
352- self .logger .error (f"Failed to apply PSA labels to namespace { namespace } : { e .reason } " )
335+ await self .v1 .patch_namespace (name = namespace , body = {"metadata" : {"labels" : psa_labels }})
336+ self .logger .info (f"Pod Security Admission labels applied to namespace { namespace } " )
353337
354338 async def ensure_image_pre_puller_daemonset (self ) -> None :
355339 """Ensure the runtime image pre-puller DaemonSet exists."""
356340 daemonset_name = "runtime-image-pre-puller"
357341 namespace = self ._settings .K8S_NAMESPACE
358342
359343 try :
360- init_containers = []
344+ init_containers : list [ k8s_client . V1Container ] = []
361345 all_images = {config .image for lang in RUNTIME_REGISTRY .values () for config in lang .values ()}
362346
363347 for i , image_ref in enumerate (sorted (all_images )):
364348 sanitized_image_ref = image_ref .split ("/" )[- 1 ].replace (":" , "-" ).replace ("." , "-" ).replace ("_" , "-" )
365349 self .logger .info (f"DAEMONSET: before: { image_ref } -> { sanitized_image_ref } " )
366- container_name = f"pull-{ i } -{ sanitized_image_ref } "
367- init_containers .append (
368- {
369- "name" : container_name ,
370- "image" : image_ref ,
371- "command" : ["/bin/sh" , "-c" , f'echo "Image { image_ref } pulled."' ],
372- "imagePullPolicy" : "Always" ,
373- }
374- )
375-
376- manifest : dict [str , Any ] = {
377- "apiVersion" : "apps/v1" ,
378- "kind" : "DaemonSet" ,
379- "metadata" : {"name" : daemonset_name , "namespace" : namespace },
380- "spec" : {
381- "selector" : {"matchLabels" : {"name" : daemonset_name }},
382- "template" : {
383- "metadata" : {"labels" : {"name" : daemonset_name }},
384- "spec" : {
385- "initContainers" : init_containers ,
386- "containers" : [{"name" : "pause" , "image" : "registry.k8s.io/pause:3.9" }],
387- "tolerations" : [{"operator" : "Exists" }],
388- },
389- },
390- "updateStrategy" : {"type" : "RollingUpdate" },
391- },
392- }
350+ init_containers .append (k8s_client .V1Container (
351+ name = f"pull-{ i } -{ sanitized_image_ref } " ,
352+ image = image_ref ,
353+ command = ["/bin/sh" , "-c" , f'echo "Image { image_ref } pulled."' ],
354+ image_pull_policy = "Always" ,
355+ ))
356+
357+ daemonset = k8s_client .V1DaemonSet (
358+ api_version = "apps/v1" ,
359+ kind = "DaemonSet" ,
360+ metadata = k8s_client .V1ObjectMeta (name = daemonset_name , namespace = namespace ),
361+ spec = k8s_client .V1DaemonSetSpec (
362+ selector = k8s_client .V1LabelSelector (match_labels = {"name" : daemonset_name }),
363+ template = k8s_client .V1PodTemplateSpec (
364+ metadata = k8s_client .V1ObjectMeta (labels = {"name" : daemonset_name }),
365+ spec = k8s_client .V1PodSpec (
366+ init_containers = init_containers ,
367+ containers = [k8s_client .V1Container (
368+ name = "pause" , image = "registry.k8s.io/pause:3.9" ,
369+ )],
370+ tolerations = [k8s_client .V1Toleration (operator = "Exists" )],
371+ ),
372+ ),
373+ update_strategy = k8s_client .V1DaemonSetUpdateStrategy (type = "RollingUpdate" ),
374+ ),
375+ )
393376
394- try :
395- await self .apps_v1 .read_namespaced_daemon_set (name = daemonset_name , namespace = namespace )
396- self .logger .info (f"DaemonSet '{ daemonset_name } ' exists. Replacing to ensure it is up-to-date." )
397- await self .apps_v1 .replace_namespaced_daemon_set (
398- name = daemonset_name , namespace = namespace , body = manifest # type: ignore[arg-type]
399- )
400- self .logger .info (f"DaemonSet '{ daemonset_name } ' replaced successfully." )
401- except ApiException as e :
402- if e .status == 404 :
403- self .logger .info (f"DaemonSet '{ daemonset_name } ' not found. Creating..." )
404- await self .apps_v1 .create_namespaced_daemon_set (
405- namespace = namespace , body = manifest # type: ignore[arg-type]
406- )
407- self .logger .info (f"DaemonSet '{ daemonset_name } ' created successfully." )
408- else :
409- raise
377+ await self .apps_v1 .patch_namespaced_daemon_set ( # type: ignore[call-arg]
378+ name = daemonset_name , namespace = namespace , body = daemonset ,
379+ field_manager = "integr8s" , force = True ,
380+ _content_type = "application/apply-patch+yaml" ,
381+ )
382+ self .logger .info (f"DaemonSet '{ daemonset_name } ' applied successfully" )
410383
411384 except ApiException as e :
412385 self .logger .error (f"K8s API error applying DaemonSet '{ daemonset_name } ': { e .reason } " , exc_info = True )
0 commit comments