Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion .github/workflows/release-deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,14 @@ jobs:
MAILJET_FROM_ADDRESS: ${{ secrets.MAILJET_FROM_ADDRESS }}
MAILJET_HOST: ${{ secrets.MAILJET_HOST }}
GRAFANA_ALERT_RECIPIENTS: ${{ secrets.GRAFANA_ALERT_RECIPIENTS }}
REDIS_PASSWORD: ${{ secrets.REDIS_PASSWORD }}
MONGO_ROOT_USER: ${{ secrets.MONGO_ROOT_USER }}
MONGO_ROOT_PASSWORD: ${{ secrets.MONGO_ROOT_PASSWORD }}
with:
host: ${{ secrets.DEPLOY_HOST }}
username: ${{ secrets.DEPLOY_USER }}
key: ${{ secrets.DEPLOY_SSH_KEY }}
envs: GHCR_TOKEN,GHCR_USER,IMAGE_TAG,GRAFANA_ADMIN_USER,GRAFANA_ADMIN_PASSWORD,MAILJET_API_KEY,MAILJET_SECRET_KEY,MAILJET_FROM_ADDRESS,MAILJET_HOST,GRAFANA_ALERT_RECIPIENTS
envs: GHCR_TOKEN,GHCR_USER,IMAGE_TAG,GRAFANA_ADMIN_USER,GRAFANA_ADMIN_PASSWORD,MAILJET_API_KEY,MAILJET_SECRET_KEY,MAILJET_FROM_ADDRESS,MAILJET_HOST,GRAFANA_ALERT_RECIPIENTS,REDIS_PASSWORD,MONGO_ROOT_USER,MONGO_ROOT_PASSWORD
command_timeout: 10m
script: |
set -e
Expand All @@ -153,6 +156,9 @@ jobs:
export MAILJET_FROM_ADDRESS="$MAILJET_FROM_ADDRESS"
export GF_SMTP_HOST="$MAILJET_HOST"
export GRAFANA_ALERT_RECIPIENTS="$GRAFANA_ALERT_RECIPIENTS"
export REDIS_PASSWORD="$REDIS_PASSWORD"
export MONGO_ROOT_USER="$MONGO_ROOT_USER"
export MONGO_ROOT_PASSWORD="$MONGO_ROOT_PASSWORD"
docker compose pull
docker compose up -d --remove-orphans --no-build --wait --wait-timeout 180

Expand Down
15 changes: 7 additions & 8 deletions backend/app/core/security.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
import jwt
from fastapi import Request
from fastapi.security import OAuth2PasswordBearer
from passlib.context import CryptContext
from pwdlib import PasswordHash
from pwdlib.hashers.bcrypt import BcryptHasher

from app.core.metrics import SecurityMetrics
from app.domain.user import CSRFValidationError, InvalidCredentialsError
Expand All @@ -20,17 +21,15 @@ def __init__(self, settings: Settings, security_metrics: SecurityMetrics) -> Non
self.settings = settings
self._security_metrics = security_metrics
# --8<-- [start:password_hashing]
self.pwd_context = CryptContext(
schemes=["bcrypt"],
deprecated="auto",
bcrypt__rounds=self.settings.BCRYPT_ROUNDS,
)
self._password_hash = PasswordHash((
BcryptHasher(rounds=self.settings.BCRYPT_ROUNDS),
))

def verify_password(self, plain_password: str, hashed_password: str) -> bool:
return self.pwd_context.verify(plain_password, hashed_password) # type: ignore
return self._password_hash.verify(plain_password, hashed_password)

def get_password_hash(self, password: str) -> str:
return self.pwd_context.hash(password) # type: ignore
return self._password_hash.hash(password)
# --8<-- [end:password_hashing]

# --8<-- [start:create_access_token]
Expand Down
2 changes: 2 additions & 0 deletions backend/app/services/k8s_worker/pod_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ def _build_pod_spec(
containers=[container],
restart_policy="Never",
active_deadline_seconds=timeout,
runtime_class_name=self._settings.K8S_POD_RUNTIME_CLASS_NAME,
host_users=False, # User namespace isolation — remaps container UIDs to unprivileged host UIDs
volumes=[
k8s_client.V1Volume(
name="script-volume",
Expand Down
254 changes: 172 additions & 82 deletions backend/app/services/k8s_worker/worker.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import asyncio
import time
from pathlib import Path
from typing import Any

import structlog
from kubernetes_asyncio import client as k8s_client
Expand Down Expand Up @@ -56,14 +55,14 @@ def __init__(
# Kubernetes clients created from ApiClient
self.v1 = k8s_client.CoreV1Api(api_client)
self.apps_v1 = k8s_client.AppsV1Api(api_client)
self.networking_v1 = k8s_client.NetworkingV1Api(api_client)

# Components
self.pod_builder = PodBuilder(settings=settings)
self.producer = producer

# State tracking
self._active_creations: set[str] = set()
self._creation_semaphore = asyncio.Semaphore(self._settings.K8S_MAX_CONCURRENT_PODS)

self.logger.info(f"KubernetesWorker initialized for namespace {self._settings.K8S_NAMESPACE}")

Expand Down Expand Up @@ -104,52 +103,51 @@ async def handle_delete_pod_command(self, command: DeletePodCommandEvent) -> Non

async def _create_pod_for_execution(self, command: CreatePodCommandEvent) -> None:
"""Create pod for execution"""
async with self._creation_semaphore:
execution_id = command.execution_id
self._active_creations.add(execution_id)
self.metrics.update_active_pod_creations(len(self._active_creations))
execution_id = command.execution_id
self._active_creations.add(execution_id)
self.metrics.update_active_pod_creations(len(self._active_creations))

start_time = time.time()
start_time = time.time()

try:
script_content = command.script
entrypoint_content = await self._get_entrypoint_script()
try:
script_content = command.script
entrypoint_content = await self._get_entrypoint_script()

# Create ConfigMap
config_map = self.pod_builder.build_config_map(
command=command, script_content=script_content, entrypoint_content=entrypoint_content
)
# Create ConfigMap
config_map = self.pod_builder.build_config_map(
command=command, script_content=script_content, entrypoint_content=entrypoint_content
)

await self._create_config_map(config_map)
await self._create_config_map(config_map)

pod = self.pod_builder.build_pod_manifest(command=command)
created_pod = await self._create_pod(pod)
pod = self.pod_builder.build_pod_manifest(command=command)
created_pod = await self._create_pod(pod)

# Set ownerReference so K8s garbage-collects the ConfigMap when the pod is deleted
if created_pod and created_pod.metadata and created_pod.metadata.uid:
await self._set_configmap_owner(config_map, created_pod)
# Set ownerReference so K8s garbage-collects the ConfigMap when the pod is deleted
if created_pod and created_pod.metadata and created_pod.metadata.uid:
await self._set_configmap_owner(config_map, created_pod)

# Publish PodCreated event
await self._publish_pod_created(command, pod)
# Publish PodCreated event
await self._publish_pod_created(command, pod)

# Update metrics
duration = time.time() - start_time
self.metrics.record_k8s_pod_creation_duration(duration, command.language)
self.metrics.record_k8s_pod_created("success", command.language)
# Update metrics
duration = time.time() - start_time
self.metrics.record_k8s_pod_creation_duration(duration, command.language)
self.metrics.record_k8s_pod_created("success", command.language)

self.logger.info(
f"Successfully created pod {pod.metadata.name} for execution {execution_id}. "
f"Duration: {duration:.2f}s"
)
self.logger.info(
f"Successfully created pod {pod.metadata.name} for execution {execution_id}. "
f"Duration: {duration:.2f}s"
)

except Exception as e:
self.logger.error(f"Failed to create pod for execution {execution_id}: {e}", exc_info=True)
self.metrics.record_k8s_pod_created("failed", "unknown")
await self._publish_pod_creation_failed(command, str(e))
except Exception as e:
self.logger.error(f"Failed to create pod for execution {execution_id}: {e}", exc_info=True)
self.metrics.record_k8s_pod_created("failed", "unknown")
await self._publish_pod_creation_failed(command, str(e))

finally:
self._active_creations.discard(execution_id)
self.metrics.update_active_pod_creations(len(self._active_creations))
finally:
self._active_creations.discard(execution_id)
self.metrics.update_active_pod_creations(len(self._active_creations))

async def _get_entrypoint_script(self) -> str:
"""Get entrypoint script content"""
Expand Down Expand Up @@ -252,62 +250,154 @@ async def _publish_pod_creation_failed(self, command: CreatePodCommandEvent, err
)
await self.producer.produce(event_to_produce=event, key=command.execution_id)

async def ensure_namespace_security(self) -> None:
"""Apply security controls to the executor namespace at startup.

Creates:
- Default-deny NetworkPolicy for executor pods (blocks lateral movement and exfiltration)
- ResourceQuota to cap aggregate CPU/memory consumption (no pod count limit)
- Pod Security Admission labels (Restricted profile)
"""
namespace = self._settings.K8S_NAMESPACE
await self._ensure_executor_network_policy(namespace)
await self._ensure_executor_resource_quota(namespace)
await self._apply_psa_labels(namespace)

async def _ensure_executor_network_policy(self, namespace: str) -> None:
"""Create or update default-deny NetworkPolicy for executor pods."""
policy_name = "executor-deny-all"

policy = k8s_client.V1NetworkPolicy(
api_version="networking.k8s.io/v1",
kind="NetworkPolicy",
metadata=k8s_client.V1ObjectMeta(
name=policy_name,
namespace=namespace,
labels={"app": "integr8s", "component": "security"},
),
spec=k8s_client.V1NetworkPolicySpec(
pod_selector=k8s_client.V1LabelSelector(match_labels={"component": "executor"}),
policy_types=["Ingress", "Egress"],
ingress=[],
egress=[],
),
)

await self.networking_v1.patch_namespaced_network_policy( # type: ignore[call-arg]
Comment thread
cubic-dev-ai[bot] marked this conversation as resolved.
name=policy_name, namespace=namespace, body=policy,
field_manager="integr8s", force=True,
_content_type="application/apply-patch+yaml",
)
self.logger.info(f"NetworkPolicy '{policy_name}' applied in namespace {namespace}")

async def _ensure_executor_resource_quota(self, namespace: str) -> None:
"""Create or update ResourceQuota to cap aggregate CPU/memory in the executor namespace."""
quota_name = "executor-quota"

quota = k8s_client.V1ResourceQuota(
api_version="v1",
kind="ResourceQuota",
metadata=k8s_client.V1ObjectMeta(
name=quota_name,
namespace=namespace,
labels={"app": "integr8s", "component": "security"},
),
spec=k8s_client.V1ResourceQuotaSpec(
hard={
"requests.cpu": self._settings.K8S_QUOTA_CPU,
"requests.memory": self._settings.K8S_QUOTA_MEMORY,
"limits.cpu": self._settings.K8S_QUOTA_CPU,
"limits.memory": self._settings.K8S_QUOTA_MEMORY,
},
),
)

await self.v1.patch_namespaced_resource_quota( # type: ignore[call-arg]
name=quota_name, namespace=namespace, body=quota,
field_manager="integr8s", force=True,
_content_type="application/apply-patch+yaml",
)
self.logger.info(f"ResourceQuota '{quota_name}' applied in namespace {namespace}")

Comment thread
HardMax71 marked this conversation as resolved.
async def _apply_psa_labels(self, namespace: str) -> None:
"""Apply Pod Security Admission labels to the executor namespace."""
psa_labels = {
"pod-security.kubernetes.io/enforce": "restricted",
"pod-security.kubernetes.io/enforce-version": "latest",
Comment thread
HardMax71 marked this conversation as resolved.
"pod-security.kubernetes.io/warn": "restricted",
"pod-security.kubernetes.io/audit": "restricted",
}

await self.v1.patch_namespace(name=namespace, body={"metadata": {"labels": psa_labels}})
self.logger.info(f"Pod Security Admission labels applied to namespace {namespace}")

async def ensure_image_pre_puller_daemonset(self) -> None:
"""Ensure the runtime image pre-puller DaemonSet exists."""
daemonset_name = "runtime-image-pre-puller"
namespace = self._settings.K8S_NAMESPACE

try:
init_containers = []
init_containers: list[k8s_client.V1Container] = []
all_images = {config.image for lang in RUNTIME_REGISTRY.values() for config in lang.values()}

psa_security_context = k8s_client.V1SecurityContext(
allow_privilege_escalation=False,
capabilities=k8s_client.V1Capabilities(drop=["ALL"]),
run_as_non_root=True,
run_as_user=65534,
seccomp_profile=k8s_client.V1SeccompProfile(type="RuntimeDefault"),
)

minimal_resources = k8s_client.V1ResourceRequirements(
requests={"cpu": "10m", "memory": "8Mi"},
limits={"cpu": "100m", "memory": "32Mi"},
)

for i, image_ref in enumerate(sorted(all_images)):
sanitized_image_ref = image_ref.split("/")[-1].replace(":", "-").replace(".", "-").replace("_", "-")
self.logger.info(f"DAEMONSET: before: {image_ref} -> {sanitized_image_ref}")
container_name = f"pull-{i}-{sanitized_image_ref}"
init_containers.append(
{
"name": container_name,
"image": image_ref,
"command": ["/bin/sh", "-c", f'echo "Image {image_ref} pulled."'],
"imagePullPolicy": "Always",
}
)

manifest: dict[str, Any] = {
"apiVersion": "apps/v1",
"kind": "DaemonSet",
"metadata": {"name": daemonset_name, "namespace": namespace},
"spec": {
"selector": {"matchLabels": {"name": daemonset_name}},
"template": {
"metadata": {"labels": {"name": daemonset_name}},
"spec": {
"initContainers": init_containers,
"containers": [{"name": "pause", "image": "registry.k8s.io/pause:3.9"}],
"tolerations": [{"operator": "Exists"}],
},
},
"updateStrategy": {"type": "RollingUpdate"},
},
}

try:
await self.apps_v1.read_namespaced_daemon_set(name=daemonset_name, namespace=namespace)
self.logger.info(f"DaemonSet '{daemonset_name}' exists. Replacing to ensure it is up-to-date.")
await self.apps_v1.replace_namespaced_daemon_set(
name=daemonset_name, namespace=namespace, body=manifest # type: ignore[arg-type]
)
self.logger.info(f"DaemonSet '{daemonset_name}' replaced successfully.")
except ApiException as e:
if e.status == 404:
self.logger.info(f"DaemonSet '{daemonset_name}' not found. Creating...")
await self.apps_v1.create_namespaced_daemon_set(
namespace=namespace, body=manifest # type: ignore[arg-type]
)
self.logger.info(f"DaemonSet '{daemonset_name}' created successfully.")
else:
raise
init_containers.append(k8s_client.V1Container(
name=f"pull-{i}-{sanitized_image_ref}",
image=image_ref,
command=["/bin/sh", "-c", f'echo "Image {image_ref} pulled."'],
image_pull_policy="Always",
security_context=psa_security_context,
resources=minimal_resources,
))

daemonset = k8s_client.V1DaemonSet(
api_version="apps/v1",
kind="DaemonSet",
metadata=k8s_client.V1ObjectMeta(name=daemonset_name, namespace=namespace),
spec=k8s_client.V1DaemonSetSpec(
selector=k8s_client.V1LabelSelector(match_labels={"name": daemonset_name}),
template=k8s_client.V1PodTemplateSpec(
metadata=k8s_client.V1ObjectMeta(labels={"name": daemonset_name}),
spec=k8s_client.V1PodSpec(
init_containers=init_containers,
containers=[k8s_client.V1Container(
name="pause", image="registry.k8s.io/pause:3.9",
security_context=psa_security_context,
resources=minimal_resources,
)],
tolerations=[k8s_client.V1Toleration(operator="Exists")],
security_context=k8s_client.V1PodSecurityContext(
run_as_non_root=True,
run_as_user=65534,
seccomp_profile=k8s_client.V1SeccompProfile(type="RuntimeDefault"),
),
),
),
update_strategy=k8s_client.V1DaemonSetUpdateStrategy(type="RollingUpdate"),
),
)

await self.apps_v1.patch_namespaced_daemon_set( # type: ignore[call-arg]
name=daemonset_name, namespace=namespace, body=daemonset,
field_manager="integr8s", force=True,
_content_type="application/apply-patch+yaml",
)
self.logger.info(f"DaemonSet '{daemonset_name}' applied successfully")

except ApiException as e:
self.logger.error(f"K8s API error applying DaemonSet '{daemonset_name}': {e.reason}", exc_info=True)
Expand Down
2 changes: 1 addition & 1 deletion backend/app/services/pod_monitor/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class PodMonitorConfig:
# Watch settings
label_selector: str = "app=integr8s,component=executor"
field_selector: str | None = None
watch_timeout_seconds: int = 300 # 5 minutes
watch_timeout_seconds: int = 30 # 30 seconds — short enough for APScheduler 5s interval

# Monitoring settings
enable_metrics: bool = True
Expand Down
Loading
Loading