Skip to content
This repository was archived by the owner on Jan 23, 2026. It is now read-only.

Commit 74f0c11

Browse files
committed
Fix hooks race condition
1 parent 18fe16d commit 74f0c11

1 file changed

Lines changed: 79 additions & 49 deletions

File tree

  • packages/jumpstarter/jumpstarter/exporter

packages/jumpstarter/jumpstarter/exporter/hooks.py

Lines changed: 79 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,18 @@
11
"""Lifecycle hooks for Jumpstarter exporters."""
22

3-
import asyncio
43
import logging
54
import os
5+
import subprocess
66
from collections.abc import Awaitable
77
from dataclasses import dataclass
88
from typing import TYPE_CHECKING, Callable, Literal
99

10+
import anyio
11+
from anyio import open_process
12+
1013
from jumpstarter.common import ExporterStatus, LogSource
1114
from jumpstarter.config.env import JMP_DRIVERS_ALLOW, JUMPSTARTER_HOST
1215
from jumpstarter.config.exporter import HookConfigV1Alpha1, HookInstanceConfigV1Alpha1
13-
from jumpstarter.exporter.logging import get_logger
1416
from jumpstarter.exporter.session import Session
1517

1618
if TYPE_CHECKING:
@@ -148,64 +150,73 @@ async def _execute_hook_process(
148150
logging_session: Session,
149151
hook_type: Literal["before_lease", "after_lease"],
150152
) -> None:
151-
"""Execute the hook process with the given environment and logging session."""
153+
"""Execute the hook process with the given environment and logging session.
154+
155+
Uses anyio for subprocess execution to be compatible with the anyio-based exporter.
156+
"""
157+
158+
152159
command = hook_config.script
153160
timeout = hook_config.timeout
154161
on_failure = hook_config.on_failure
155162

156163
# Exception handling
157164
error_msg: str | None = None
158165
cause: Exception | None = None
166+
timed_out = False
159167

160168
try:
161-
# Execute the hook command using shell
162-
process = await asyncio.create_subprocess_shell(
169+
# Execute the hook command using shell via anyio
170+
# Pass the command as a string to use shell mode
171+
async with await open_process(
163172
command,
164173
env=hook_env,
165-
stdout=asyncio.subprocess.PIPE,
166-
stderr=asyncio.subprocess.STDOUT,
167-
)
168-
169-
try:
170-
# Create a logger with automatic source registration
171-
hook_logger = get_logger(f"hook.{lease_scope.lease_name}", log_source, logging_session)
172-
173-
# Stream output line-by-line for real-time logging
174-
output_lines = []
175-
176-
async def read_output():
177-
while True:
178-
line = await process.stdout.readline()
179-
if not line:
180-
break
181-
line_decoded = line.decode().rstrip()
182-
output_lines.append(line_decoded)
183-
# Route hook output through the logging system
184-
hook_logger.info(line_decoded)
185-
186-
# Run output reading and process waiting concurrently with timeout
187-
await asyncio.wait_for(asyncio.gather(read_output(), process.wait()), timeout=timeout)
174+
stdout=subprocess.PIPE,
175+
stderr=subprocess.STDOUT,
176+
) as process:
177+
output_lines: list[str] = []
178+
179+
async def read_output() -> None:
180+
"""Read stdout line by line."""
181+
assert process.stdout is not None
182+
buffer = b""
183+
async for chunk in process.stdout:
184+
buffer += chunk
185+
while b"\n" in buffer:
186+
line, buffer = buffer.split(b"\n", 1)
187+
line_decoded = line.decode().rstrip()
188+
output_lines.append(line_decoded)
189+
logger.info("[hook output] %s", line_decoded)
190+
# Handle any remaining data without newline
191+
if buffer:
192+
line_decoded = buffer.decode().rstrip()
193+
if line_decoded:
194+
output_lines.append(line_decoded)
195+
logger.info("[hook output] %s", line_decoded)
196+
197+
# Use move_on_after for timeout
198+
with anyio.move_on_after(timeout) as cancel_scope:
199+
await read_output()
200+
await process.wait()
188201

189-
# Check if hook succeeded (exit code 0)
190-
if process.returncode == 0:
202+
if cancel_scope.cancelled_caught:
203+
timed_out = True
204+
error_msg = f"Hook timed out after {timeout} seconds"
205+
logger.error(error_msg)
206+
# Terminate the process
207+
process.terminate()
208+
# Give it a moment to terminate gracefully
209+
with anyio.move_on_after(5):
210+
await process.wait()
211+
# Force kill if still running
212+
if process.returncode is None:
213+
process.kill()
214+
215+
elif process.returncode == 0:
191216
logger.info("Hook executed successfully")
192217
return
193-
194-
# Non-zero exit code is a failure
195-
error_msg = f"Hook failed with exit code {process.returncode}"
196-
197-
except asyncio.TimeoutError as e:
198-
error_msg = f"Hook timed out after {timeout} seconds"
199-
cause = e
200-
logger.error(error_msg)
201-
try:
202-
# Attempt to gracefully terminate the process
203-
process.terminate()
204-
await asyncio.wait_for(process.wait(), timeout=5)
205-
except asyncio.TimeoutError:
206-
# Force kill if it didn't terminate in time
207-
process.kill()
208-
await process.wait()
218+
else:
219+
error_msg = f"Hook failed with exit code {process.returncode}"
209220

210221
except Exception as e:
211222
error_msg = f"Error executing hook: {e}"
@@ -214,6 +225,9 @@ async def read_output():
214225

215226
# Handle failure if one occurred
216227
if error_msg is not None:
228+
# For timeout, create a TimeoutError as the cause
229+
if timed_out and cause is None:
230+
cause = TimeoutError(error_msg)
217231
self._handle_hook_failure(error_msg, on_failure, hook_type, cause)
218232

219233
async def execute_before_lease_hook(self, lease_scope: "LeaseContext") -> None:
@@ -278,7 +292,19 @@ async def run_before_lease_hook(
278292
"""
279293
try:
280294
# Wait for lease scope to be fully populated by handle_lease
281-
assert lease_scope.is_ready(), "LeaseScope must be fully initialized before running before-lease hooks"
295+
# This is necessary because handle_lease and run_before_lease_hook run concurrently
296+
timeout = 30 # seconds
297+
interval = 0.1 # seconds
298+
elapsed = 0.0
299+
while not lease_scope.is_ready():
300+
if elapsed >= timeout:
301+
error_msg = "Timeout waiting for lease scope to be ready"
302+
logger.error(error_msg)
303+
await report_status(ExporterStatus.BEFORE_LEASE_HOOK_FAILED, error_msg)
304+
lease_scope.before_lease_hook.set()
305+
return
306+
await anyio.sleep(interval)
307+
elapsed += interval
282308

283309
# Check if hook is configured
284310
if not self.config.before_lease:
@@ -351,8 +377,12 @@ async def run_after_lease_hook(
351377
shutdown: Callback to trigger exporter shutdown on critical failures
352378
"""
353379
try:
354-
# Verify lease scope is ready
355-
assert lease_scope.is_ready(), "LeaseScope must be fully initialized before running after-lease hooks"
380+
# Verify lease scope is ready - for after-lease this should always be true
381+
# since we've already processed the lease, but check defensively
382+
if not lease_scope.is_ready():
383+
logger.warning("LeaseScope not ready for after-lease hook, skipping")
384+
await report_status(ExporterStatus.AVAILABLE, "Available for new lease")
385+
return
356386

357387
# Check if hook is configured
358388
if not self.config.after_lease:

0 commit comments

Comments
 (0)