Skip to content

Commit 76b9efe

Browse files
committed
fix: remove backend-wide k8s_max_concurrent_pods, replaced with dedicated limits for system-wide k8s quota on cpu and memory
1 parent a0d2e91 commit 76b9efe

6 files changed

Lines changed: 54 additions & 60 deletions

File tree

backend/app/services/k8s_worker/worker.py

Lines changed: 40 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,6 @@ def __init__(
6363

6464
# State tracking
6565
self._active_creations: set[str] = set()
66-
self._creation_semaphore = asyncio.Semaphore(self._settings.K8S_MAX_CONCURRENT_PODS)
6766

6867
self.logger.info(f"KubernetesWorker initialized for namespace {self._settings.K8S_NAMESPACE}")
6968

@@ -104,52 +103,51 @@ async def handle_delete_pod_command(self, command: DeletePodCommandEvent) -> Non
104103

105104
async def _create_pod_for_execution(self, command: CreatePodCommandEvent) -> None:
106105
"""Create pod for execution"""
107-
async with self._creation_semaphore:
108-
execution_id = command.execution_id
109-
self._active_creations.add(execution_id)
110-
self.metrics.update_active_pod_creations(len(self._active_creations))
106+
execution_id = command.execution_id
107+
self._active_creations.add(execution_id)
108+
self.metrics.update_active_pod_creations(len(self._active_creations))
111109

112-
start_time = time.time()
110+
start_time = time.time()
113111

114-
try:
115-
script_content = command.script
116-
entrypoint_content = await self._get_entrypoint_script()
112+
try:
113+
script_content = command.script
114+
entrypoint_content = await self._get_entrypoint_script()
117115

118-
# Create ConfigMap
119-
config_map = self.pod_builder.build_config_map(
120-
command=command, script_content=script_content, entrypoint_content=entrypoint_content
121-
)
116+
# Create ConfigMap
117+
config_map = self.pod_builder.build_config_map(
118+
command=command, script_content=script_content, entrypoint_content=entrypoint_content
119+
)
122120

123-
await self._create_config_map(config_map)
121+
await self._create_config_map(config_map)
124122

125-
pod = self.pod_builder.build_pod_manifest(command=command)
126-
created_pod = await self._create_pod(pod)
123+
pod = self.pod_builder.build_pod_manifest(command=command)
124+
created_pod = await self._create_pod(pod)
127125

128-
# Set ownerReference so K8s garbage-collects the ConfigMap when the pod is deleted
129-
if created_pod and created_pod.metadata and created_pod.metadata.uid:
130-
await self._set_configmap_owner(config_map, created_pod)
126+
# Set ownerReference so K8s garbage-collects the ConfigMap when the pod is deleted
127+
if created_pod and created_pod.metadata and created_pod.metadata.uid:
128+
await self._set_configmap_owner(config_map, created_pod)
131129

132-
# Publish PodCreated event
133-
await self._publish_pod_created(command, pod)
130+
# Publish PodCreated event
131+
await self._publish_pod_created(command, pod)
134132

135-
# Update metrics
136-
duration = time.time() - start_time
137-
self.metrics.record_k8s_pod_creation_duration(duration, command.language)
138-
self.metrics.record_k8s_pod_created("success", command.language)
133+
# Update metrics
134+
duration = time.time() - start_time
135+
self.metrics.record_k8s_pod_creation_duration(duration, command.language)
136+
self.metrics.record_k8s_pod_created("success", command.language)
139137

140-
self.logger.info(
141-
f"Successfully created pod {pod.metadata.name} for execution {execution_id}. "
142-
f"Duration: {duration:.2f}s"
143-
)
138+
self.logger.info(
139+
f"Successfully created pod {pod.metadata.name} for execution {execution_id}. "
140+
f"Duration: {duration:.2f}s"
141+
)
144142

145-
except Exception as e:
146-
self.logger.error(f"Failed to create pod for execution {execution_id}: {e}", exc_info=True)
147-
self.metrics.record_k8s_pod_created("failed", "unknown")
148-
await self._publish_pod_creation_failed(command, str(e))
143+
except Exception as e:
144+
self.logger.error(f"Failed to create pod for execution {execution_id}: {e}", exc_info=True)
145+
self.metrics.record_k8s_pod_created("failed", "unknown")
146+
await self._publish_pod_creation_failed(command, str(e))
149147

150-
finally:
151-
self._active_creations.discard(execution_id)
152-
self.metrics.update_active_pod_creations(len(self._active_creations))
148+
finally:
149+
self._active_creations.discard(execution_id)
150+
self.metrics.update_active_pod_creations(len(self._active_creations))
153151

154152
async def _get_entrypoint_script(self) -> str:
155153
"""Get entrypoint script content"""
@@ -257,7 +255,7 @@ async def ensure_namespace_security(self) -> None:
257255
258256
Creates:
259257
- Default-deny NetworkPolicy for executor pods (blocks lateral movement and exfiltration)
260-
- ResourceQuota to cap aggregate pod/resource consumption
258+
- ResourceQuota to cap aggregate CPU/memory consumption (no pod count limit)
261259
- Pod Security Admission labels (Restricted profile)
262260
"""
263261
namespace = self._settings.K8S_NAMESPACE
@@ -293,9 +291,8 @@ async def _ensure_executor_network_policy(self, namespace: str) -> None:
293291
self.logger.info(f"NetworkPolicy '{policy_name}' applied in namespace {namespace}")
294292

295293
async def _ensure_executor_resource_quota(self, namespace: str) -> None:
296-
"""Create or update ResourceQuota to cap aggregate executor pod consumption."""
294+
"""Create or update ResourceQuota to cap aggregate CPU/memory in the executor namespace."""
297295
quota_name = "executor-quota"
298-
n = self._settings.K8S_MAX_CONCURRENT_PODS
299296

300297
quota = k8s_client.V1ResourceQuota(
301298
api_version="v1",
@@ -307,11 +304,10 @@ async def _ensure_executor_resource_quota(self, namespace: str) -> None:
307304
),
308305
spec=k8s_client.V1ResourceQuotaSpec(
309306
hard={
310-
"pods": str(n),
311-
"requests.cpu": f"{int(self._settings.K8S_POD_CPU_REQUEST.removesuffix('m')) * n}m",
312-
"requests.memory": f"{int(self._settings.K8S_POD_MEMORY_REQUEST.removesuffix('Mi')) * n}Mi",
313-
"limits.cpu": f"{int(self._settings.K8S_POD_CPU_LIMIT.removesuffix('m')) * n}m",
314-
"limits.memory": f"{int(self._settings.K8S_POD_MEMORY_LIMIT.removesuffix('Mi')) * n}Mi",
307+
"requests.cpu": self._settings.K8S_QUOTA_CPU,
308+
"requests.memory": self._settings.K8S_QUOTA_MEMORY,
309+
"limits.cpu": self._settings.K8S_QUOTA_CPU,
310+
"limits.memory": self._settings.K8S_QUOTA_MEMORY,
315311
},
316312
),
317313
)

backend/app/services/runtime_settings.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,5 @@ def _build_toml_defaults(self) -> SystemSettings:
5151
max_timeout_seconds=s.K8S_POD_EXECUTION_TIMEOUT,
5252
memory_limit=s.K8S_POD_MEMORY_LIMIT,
5353
cpu_limit=s.K8S_POD_CPU_LIMIT,
54-
max_concurrent_executions=s.K8S_MAX_CONCURRENT_PODS,
5554
session_timeout_minutes=s.ACCESS_TOKEN_EXPIRE_MINUTES,
5655
)

backend/app/settings.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,9 +68,6 @@ def __init__(
6868
# Kubernetes namespace for execution pods
6969
K8S_NAMESPACE: str = "integr8scode"
7070

71-
# Maximum concurrent pod creations allowed by k8s worker
72-
K8S_MAX_CONCURRENT_PODS: int = 10
73-
7471
# Settings for Kubernetes resource limits and requests
7572
K8S_POD_CPU_LIMIT: str = "1000m"
7673
K8S_POD_MEMORY_LIMIT: str = "128Mi"
@@ -80,6 +77,10 @@ def __init__(
8077
K8S_POD_PRIORITY_CLASS_NAME: str | None = None
8178
K8S_POD_RUNTIME_CLASS_NAME: str | None = None # e.g. "gvisor" for sandboxed execution
8279

80+
# Namespace-level ResourceQuota caps (total budget, not per-pod)
81+
K8S_QUOTA_CPU: str = "10000m"
82+
K8S_QUOTA_MEMORY: str = "1280Mi"
83+
8384
SUPPORTED_RUNTIMES: dict[str, LanguageInfoDomain] = Field(default_factory=lambda: RUNTIME_MATRIX)
8485

8586
EXAMPLE_SCRIPTS: dict[str, str] = Field(default_factory=lambda: EXEC_EXAMPLE_SCRIPTS)

backend/tests/e2e/test_admin_settings_routes.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,6 @@ async def test_reset_system_settings(
172172
max_timeout_seconds=test_settings.K8S_POD_EXECUTION_TIMEOUT,
173173
memory_limit=test_settings.K8S_POD_MEMORY_LIMIT,
174174
cpu_limit=test_settings.K8S_POD_CPU_LIMIT,
175-
max_concurrent_executions=test_settings.K8S_MAX_CONCURRENT_PODS,
176175
session_timeout_minutes=test_settings.ACCESS_TOKEN_EXPIRE_MINUTES,
177176
)
178177
assert settings == expected

backend/tests/unit/services/test_runtime_settings.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ def _make_settings() -> Settings:
1919
"K8S_POD_EXECUTION_TIMEOUT": 30,
2020
"K8S_POD_MEMORY_LIMIT": "128Mi",
2121
"K8S_POD_CPU_LIMIT": "1000m",
22-
"K8S_MAX_CONCURRENT_PODS": 5,
2322
"ACCESS_TOKEN_EXPIRE_MINUTES": 60,
2423
})
2524

@@ -58,7 +57,7 @@ async def test_passes_toml_defaults_to_repo() -> None:
5857
assert defaults.max_timeout_seconds == 30
5958
assert defaults.memory_limit == "128Mi"
6059
assert defaults.cpu_limit == "1000m"
61-
assert defaults.max_concurrent_executions == 5
60+
assert defaults.max_concurrent_executions == 10
6261
assert defaults.session_timeout_minutes == 60
6362

6463

docs/security/policies.md

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -62,17 +62,17 @@ This policy matches pods with the `component: executor` label, which the pod bui
6262

6363
### Resource Quota
6464

65-
A ResourceQuota caps aggregate resource consumption in the executor namespace:
65+
A ResourceQuota caps aggregate CPU and memory in the executor namespace. If the execution queue allows more pods than
66+
the namespace has resources for, Kubernetes keeps excess pods in Pending state rather than failing.
6667

67-
| Resource | Limit | Derivation |
68-
|-------------------|-----------------------------------|----------------------------------|
69-
| `pods` | `K8S_MAX_CONCURRENT_PODS` | Maximum concurrent executor pods |
70-
| `requests.cpu` | `K8S_MAX_CONCURRENT_PODS` cores | 1 core per pod |
71-
| `requests.memory` | `K8S_MAX_CONCURRENT_PODS × 128Mi` | 128Mi per pod |
72-
| `limits.cpu` | Same as requests | Prevents burst beyond quota |
73-
| `limits.memory` | Same as requests | Prevents OOM beyond quota |
68+
| Resource | Limit | Source setting |
69+
|-------------------|--------------------|--------------------|
70+
| `requests.cpu` | `K8S_QUOTA_CPU` | Namespace CPU cap |
71+
| `requests.memory` | `K8S_QUOTA_MEMORY` | Namespace memory cap |
72+
| `limits.cpu` | `K8S_QUOTA_CPU` | Same as requests |
73+
| `limits.memory` | `K8S_QUOTA_MEMORY` | Same as requests |
7474

75-
This prevents a burst of executions from starving other workloads in the cluster.
75+
No `pods` count in the quota — concurrency is controlled by the execution queue (`max_concurrent_executions`).
7676

7777
### Pod Security Admission (PSA)
7878

0 commit comments

Comments
 (0)