Skip to content

Commit 6c5d3bc

Browse files
committed
decoupling from python-metrics calculation (moved to strict shell-only), moving of metrics processing from k8s service (only for k8s pods spinning/starting/returning data) to exec service
1 parent 517740e commit 6c5d3bc

5 files changed

Lines changed: 153 additions & 143 deletions

File tree

backend/app/scripts/entrypoint.py

Lines changed: 0 additions & 67 deletions
This file was deleted.

backend/app/scripts/entrypoint.sh

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
#!/bin/sh
2+
3+
# This script is written in the strictest, most portable POSIX sh.
4+
# It makes zero assumptions about the shell's features.
5+
6+
# Use a simple, POSIX-compliant method for escaping JSON strings.
7+
json_escape() {
8+
sed -e ':a;N;$!ba' -e 's/\\/\\\\/g' -e 's/"/\\"/g' -e 's/\n/\\n/g' -e 's/\t/\\t/g' -e 's/\r/\\r/g'
9+
}
10+
11+
# Exit immediately if no command is provided.
12+
if [ "$#" -eq 0 ]; then
13+
printf '{"exit_code": 127, "resource_usage": null, "stdout": "", "stderr": "Entrypoint Error: No command provided."}'
14+
exit 0
15+
fi
16+
17+
# Create temporary files for stdout and stderr. Exit if mktemp fails.
18+
set -e
19+
OUT=$(mktemp)
20+
ERR=$(mktemp)
21+
trap 'rm -f "$OUT" "$ERR"' EXIT
22+
set +e
23+
24+
# Record start time using nanosecond precision (%s for seconds, %N for nanoseconds).
25+
START_TIME=$(date +%s.%N)
26+
27+
# This subshell construct is the key. It isolates the user command.
28+
# A failure inside this subshell will not crash our main script.
29+
( "$@" >"$OUT" 2>"$ERR" ) &
30+
PID=$!
31+
32+
PEAK_KB=0
33+
JIFS=0
34+
35+
# Loop while the process directory exists. This is the most reliable check.
36+
while [ -d "/proc/$PID" ]; do
37+
# Silence all errors from grep/cat to prevent log contamination from race conditions.
38+
CUR_KB=$(grep VmHWM "/proc/$PID/status" 2>/dev/null | awk '{print $2}')
39+
if [ -n "$CUR_KB" ] && [ "$CUR_KB" -gt "$PEAK_KB" ]; then
40+
PEAK_KB=$CUR_KB
41+
fi
42+
43+
# Use awk for arithmetic; it handles empty/malformed input without crashing the shell.
44+
CPU_JIFFIES=$(awk '{print $14 + $15}' "/proc/$PID/stat" 2>/dev/null)
45+
if [ -n "$CPU_JIFFIES" ]; then
46+
JIFS=$CPU_JIFFIES
47+
fi
48+
sleep 0.05
49+
done
50+
51+
# This will now work correctly because the parent script is not PID 1.
52+
wait "$PID"
53+
EXIT_CODE=$?
54+
55+
END_TIME=$(date +%s.%N)
56+
# Calculate elapsed time using floating-point math via awk, formatted to 6 decimal places.
57+
# The result is stored in ELAPSED_S, which the original JSON block uses.
58+
ELAPSED_S=$(printf '%s\n' "$END_TIME $START_TIME" | awk '{printf "%.6f", $1 - $2}')
59+
60+
# Defensively get clock ticks per second.
61+
CLK_TCK=$(getconf CLK_TCK 2>/dev/null || printf "100")
62+
63+
OUT_JSON=$(cat "$OUT" | json_escape)
64+
ERR_JSON=$(cat "$ERR" | json_escape)
65+
66+
# Use the most robust printf format possible, passing all variables as arguments.
67+
# The final output has NO trailing newline. This is critical.
68+
json=$(cat <<EOF
69+
{
70+
"exit_code": ${EXIT_CODE:-1},
71+
"resource_usage": {
72+
"execution_time_wall_seconds": ${ELAPSED_S:-0},
73+
"cpu_time_jiffies": ${JIFS:-0},
74+
"clk_tck_hertz": ${CLK_TCK:-100},
75+
"peak_memory_kb": ${PEAK_KB:-0}
76+
},
77+
"stdout": "$(cat "$OUT" | json_escape)",
78+
"stderr": "$(cat "$ERR" | json_escape)"
79+
}
80+
EOF
81+
)
82+
83+
# final output – no trailing newline!
84+
printf '%s' "$json"

backend/app/services/execution_service.py

Lines changed: 43 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -114,56 +114,61 @@ async def _get_k8s_execution_output(
114114
return output, error_msg, phase, resource_usage
115115

116116
async def _try_finalize_execution(self, execution: ExecutionInDB) -> Optional[ExecutionInDB]:
117-
"""
118-
Checks K8s for a final status. If found, updates the database and
119-
returns the updated execution object. Otherwise, returns None.
120-
"""
121-
output, _, final_phase, resources = await self._get_k8s_execution_output(execution.id)
117+
try:
118+
metrics, final_phase = await self.k8s_service.get_pod_logs(execution.id)
119+
except KubernetesPodError as e:
120+
logger.error(f"K8s pod error finalizing execution {execution.id}: {e}")
121+
update_data = {"status": ExecutionStatus.ERROR, "errors": str(e), "resource_usage": {"pod_phase": "Error"}}
122+
except Exception as e:
123+
logger.error(f"Unexpected error finalizing execution {execution.id}: {e}", exc_info=True)
124+
update_data = {"status": ExecutionStatus.ERROR, "errors": f"Unexpected infrastructure error: {e}",
125+
"resource_usage": {"pod_phase": "Error"}}
126+
else:
127+
logger.info(f"Successfully parsed metrics from pod: {metrics}")
128+
129+
exit_code = metrics.get("exit_code")
130+
res_usage = metrics.get("resource_usage")
131+
132+
if not res_usage:
133+
return None # waiting for results
122134

123-
update_data = {}
135+
wall_s = res_usage.get("execution_time_wall_seconds") or 0
136+
jiffies = float(res_usage.get("cpu_time_jiffies", 0) or 0)
137+
hertz = float(res_usage.get("clk_tck_hertz", 100) or 100)
138+
cpu_s = jiffies / hertz if hertz > 0 else 0.0 # total CPU-time
124139

125-
# Now we only have basic metrics from K8s
126-
if resources:
127-
exit_code = resources.get("exit_code", 1)
140+
# average CPU in millicores: (CPU-seconds / wall-seconds) × 1000
141+
cpu_millicores = (cpu_s / wall_s * 1000) if wall_s else 0.0
142+
143+
# VmHWM is k *ibi*bytes → MiB = KiB / 1024
144+
peak_kib = float(res_usage.get("peak_memory_kb", 0) or 0)
145+
peak_mib = peak_kib / 1024.0
146+
147+
final_resource_usage = {
148+
"execution_time": round(wall_s, 6),
149+
"cpu_usage": round(cpu_millicores, 2),
150+
"memory_usage": round(peak_mib, 2),
151+
}
152+
153+
final_resource_usage["pod_phase"] = final_phase
128154

129155
if exit_code == 0:
130156
update_data = {
131157
"status": ExecutionStatus.COMPLETED,
132-
"output": output or "",
158+
"output": metrics.get("stdout", ""),
133159
"errors": None,
134-
"resource_usage": resources # Only has exit_code, execution_time, pod_phase
160+
"resource_usage": final_resource_usage,
135161
}
136162
else:
137-
# Script failed - output contains stderr/stdout
138-
error_details = output or f"Script failed with exit code {exit_code}"
163+
error_details = metrics.get("stderr") or f"Script failed with exit code {exit_code}."
139164
update_data = {
140165
"status": ExecutionStatus.ERROR,
141-
"output": "",
166+
"output": metrics.get("stdout", ""),
142167
"errors": error_details,
143-
"resource_usage": resources
168+
"resource_usage": final_resource_usage,
144169
}
145-
else:
146-
# No metrics at all - use pod phase
147-
if final_phase == "Succeeded":
148-
update_data = {
149-
"status": ExecutionStatus.COMPLETED,
150-
"output": output or "",
151-
"errors": None,
152-
"resource_usage": {"pod_phase": final_phase}
153-
}
154-
else:
155-
error_details = output or f"Pod failed with phase '{final_phase}'"
156-
update_data = {
157-
"status": ExecutionStatus.ERROR,
158-
"output": "",
159-
"errors": error_details,
160-
"resource_usage": {"pod_phase": final_phase}
161-
}
162-
163-
if not update_data:
164-
return None
165170

166-
logger.info(f"Finalizing execution {execution.id} with status: {update_data['status']}")
171+
logger.info(f"Finalizing execution {execution.id} with status: {update_data.get('status', 'unknown')}")
167172
update_payload = ExecutionUpdate(**update_data).model_dump(exclude_unset=True)
168173
await self.execution_repo.update_execution(execution.id, update_payload)
169174

@@ -219,8 +224,8 @@ async def execute_script(
219224
ExecutionUpdate(status=ExecutionStatus.ERROR, errors=str(e)).model_dump(exclude_unset=True)
220225
)
221226
raise IntegrationException(status_code=500,
222-
detail=f"Internal server error during script execution request: "
223-
f"{str(e)}") from e
227+
detail=f"Internal server error during script execution request: "
228+
f"{str(e)}") from e
224229
finally:
225230
EXECUTION_DURATION.labels(python_version=python_version).observe(time() - start_time)
226231
ACTIVE_EXECUTIONS.dec()

backend/app/services/kubernetes_service.py

Lines changed: 25 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1+
import ast
12
import asyncio
2-
import json
33
import os
44
from datetime import datetime, timedelta, timezone
55
from pathlib import Path
@@ -97,7 +97,11 @@ async def graceful_shutdown(self) -> None:
9797
logger.warning("Shutdown timeout reached, forcing pod termination")
9898
break
9999
try:
100-
await self._cleanup_pod_resources(pod_name)
100+
if not pod_name.startswith("execution-"):
101+
return
102+
execution_id = pod_name[len("execution-"):]
103+
config_map_name = f"script-{execution_id}"
104+
await self._cleanup_resources(pod_name, config_map_name)
101105
except Exception as e:
102106
logger.error(f"Error during pod cleanup on shutdown: {str(e)}")
103107

@@ -156,18 +160,18 @@ async def create_execution_pod(
156160
pod_name = f"execution-{execution_id}"
157161

158162
try:
159-
entrypoint_script_path = Path("app/scripts/entrypoint.py")
163+
entrypoint_script_path = Path("app/scripts/entrypoint.sh")
160164
entrypoint_code = await asyncio.to_thread(entrypoint_script_path.read_text)
161165

162-
config_map_data["entrypoint.py"] = entrypoint_code
166+
config_map_data["entrypoint.sh"] = entrypoint_code
163167

164168
config_map_body = k8s_client.V1ConfigMap(
165169
metadata=k8s_client.V1ObjectMeta(name=config_map_name),
166170
data=config_map_data
167171
)
168172
await self._create_config_map(config_map_body)
169173

170-
final_pod_command = ["/scripts/entrypoint.py"] + command
174+
final_pod_command = ["/bin/sh", "/scripts/entrypoint.sh", *command]
171175

172176
builder = PodManifestBuilder(
173177
execution_id=execution_id,
@@ -193,44 +197,34 @@ async def create_execution_pod(
193197
await self._cleanup_resources(pod_name, config_map_name)
194198
raise KubernetesPodError(f"Failed to create execution pod: {str(e)}") from e
195199

196-
async def get_pod_logs(self, execution_id: str) -> tuple[str, str, dict]:
197-
# This method reverts to the simple version that parses the clean log output
200+
async def get_pod_logs(self, execution_id: str) -> tuple[dict, str]:
198201
pod_name = f"execution-{execution_id}"
199202
config_map_name = f"script-{execution_id}"
200-
201203
try:
202204
pod = await self._wait_for_pod_completion(pod_name)
203205
pod_phase = pod.status.phase if pod and pod.status else "Unknown"
204206
full_logs = await self._get_container_logs(pod_name, "script-runner")
207+
logger.info(f"Raw logs from pod {pod_name}:\n---\n{full_logs}\n---")
205208

206-
# The simple, reliable parser for the ###METRICS### block
207-
output, metrics = self._extract_execution_metrics(full_logs)
208-
209-
final_exit_code = metrics.get("exit_code", 1)
210-
metrics["pod_phase"] = pod_phase
211-
metrics["status"] = "completed" if final_exit_code == 0 else "error"
212-
213-
return output, pod_phase, metrics
209+
try:
210+
# https://stackoverflow.com/questions/15197673/using-pythons-eval-vs-ast-literal-eval
211+
metrics = ast.literal_eval(full_logs)
212+
return metrics, pod_phase
213+
except (ValueError, SyntaxError, TypeError) as e:
214+
logger.error(f"FAILED TO PARSE LOGS FROM POD {pod_name} as a Python literal: {e}")
215+
error_payload = {
216+
"exit_code": -1,
217+
"stdout": "",
218+
"stderr": f"Internal execution error: Pod logs were not valid JSON. "
219+
f"Pod phase: {pod_phase}.\nRaw Logs:\n{full_logs}",
220+
"resource_usage": None,
221+
}
222+
return error_payload, pod_phase
214223
finally:
215224
logger.info(f"Initiating cleanup for execution '{execution_id}'...")
216225
await self._cleanup_resources(pod_name, config_map_name)
217226
self._active_pods.pop(execution_id, None)
218227

219-
def _extract_execution_metrics(self, logs: str) -> tuple[str, dict]:
220-
# This is the simple parser for the entrypoint.py output
221-
split_marker = "\n###METRICS###\n"
222-
if split_marker in logs:
223-
output, metrics_json = logs.rsplit(split_marker, 1)
224-
try:
225-
metrics_data = json.loads(metrics_json)
226-
return output.strip(), metrics_data
227-
except json.JSONDecodeError:
228-
logger.error(f"Failed to decode metrics JSON: {metrics_json}")
229-
return logs.strip(), {"error": "Failed to decode metrics JSON.", "exit_code": 1}
230-
231-
logger.warning("Metrics marker not found in logs.")
232-
return logs.strip(), {"error": "Metrics marker not found in logs.", "exit_code": 1}
233-
234228
async def _wait_for_pod_completion(self, pod_name: str) -> k8s_client.V1Pod:
235229
logger.info(f"Waiting for pod '{pod_name}' to complete...")
236230
for _ in range(self.POD_RETRY_ATTEMPTS):
@@ -292,13 +286,6 @@ async def _cleanup_resources(self, pod_name: str, config_map_name: str) -> None:
292286
except ApiException as e:
293287
logger.error(f"Failed to delete config map '{config_map_name}': {e.reason}")
294288

295-
async def _cleanup_pod_resources(self, pod_name: str) -> None:
296-
if not pod_name.startswith("execution-"):
297-
return
298-
execution_id = pod_name[len("execution-"):]
299-
config_map_name = f"script-{execution_id}"
300-
await self._cleanup_resources(pod_name, config_map_name)
301-
302289

303290
def get_k8s_manager(request: Request) -> KubernetesServiceManager:
304291
if not hasattr(request.app.state, "k8s_manager"):

backend/app/services/pod_manifest_builder.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ def build(self) -> Dict[str, Any]:
4444
"name": "script-runner",
4545
"image": self.image,
4646
"command": self.command,
47+
"args": [],
4748
"resources": {
4849
"limits": {"cpu": self.pod_cpu_limit, "memory": self.pod_memory_limit},
4950
"requests": {"cpu": self.pod_cpu_request, "memory": self.pod_memory_request},

0 commit comments

Comments
 (0)