Skip to content
This repository was archived by the owner on Jan 23, 2026. It is now read-only.

Commit fed2db1

Browse files
committed
Improve logging infrastructure
1 parent 6b9d788 commit fed2db1

5 files changed

Lines changed: 101 additions & 13 deletions

File tree

packages/jumpstarter/jumpstarter/driver/base.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,9 @@
2727
MARKER_STREAMCALL,
2828
MARKER_STREAMING_DRIVERCALL,
2929
)
30-
from jumpstarter.common import Metadata
30+
from jumpstarter.common import LogSource, Metadata
3131
from jumpstarter.common.resources import ClientStreamResource, PresignedRequestResource, Resource, ResourceMetadata
32+
from jumpstarter.exporter.logging import get_logger
3233
from jumpstarter.common.serde import decode_value, encode_value
3334
from jumpstarter.common.streams import (
3435
DriverStreamRequest,
@@ -86,7 +87,7 @@ def __post_init__(self):
8687
if hasattr(super(), "__post_init__"):
8788
super().__post_init__()
8889

89-
self.logger = logging.getLogger(self.__class__.__name__)
90+
self.logger = get_logger(f"driver.{self.__class__.__name__}", LogSource.DRIVER)
9091
self.logger.setLevel(self.log_level)
9192

9293
def close(self):

packages/jumpstarter/jumpstarter/exporter/exporter.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,8 @@ async def status(retries=5, backoff=3):
240240
# Shield the post-lease hook from cancellation and await it
241241
with CancelScope(shield=True):
242242
await self._update_status(ExporterStatus.AFTER_LEASE_HOOK, "Running afterLease hooks")
243+
# Pass the current session to hook executor for logging
244+
self.hook_executor.main_session = self._current_session
243245
await self.hook_executor.execute_post_lease_hook(hook_context)
244246
await self._update_status(ExporterStatus.AVAILABLE, "Available for new lease")
245247

@@ -279,6 +281,8 @@ async def run_before_lease_hook(hook_ctx):
279281
await self._update_status(
280282
ExporterStatus.BEFORE_LEASE_HOOK, "Running beforeLease hooks"
281283
)
284+
# Pass the current session to hook executor for logging
285+
self.hook_executor.main_session = self._current_session
282286
await self.hook_executor.execute_pre_lease_hook(hook_ctx)
283287
await self._update_status(ExporterStatus.LEASE_READY, "Ready for commands")
284288
logger.info("beforeLease hook completed successfully")
@@ -311,6 +315,8 @@ async def run_before_lease_hook(hook_ctx):
311315
# Shield the post-lease hook from cancellation and await it
312316
with CancelScope(shield=True):
313317
await self._update_status(ExporterStatus.AFTER_LEASE_HOOK, "Running afterLease hooks")
318+
# Pass the current session to hook executor for logging
319+
self.hook_executor.main_session = self._current_session
314320
await self.hook_executor.execute_post_lease_hook(hook_context)
315321
await self._update_status(ExporterStatus.AVAILABLE, "Available for new lease")
316322

packages/jumpstarter/jumpstarter/exporter/hooks.py

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,11 @@
77
from dataclasses import dataclass, field
88
from typing import Callable
99

10+
from jumpstarter.common import LogSource
1011
from jumpstarter.config.env import JMP_DRIVERS_ALLOW, JUMPSTARTER_HOST
1112
from jumpstarter.config.exporter import HookConfigV1Alpha1
1213
from jumpstarter.driver import Driver
14+
from jumpstarter.exporter.logging import get_logger
1315
from jumpstarter.exporter.session import Session
1416

1517
logger = logging.getLogger(__name__)
@@ -32,6 +34,7 @@ class HookExecutor:
3234

3335
config: HookConfigV1Alpha1
3436
device_factory: Callable[[], Driver]
37+
main_session: Session | None = field(default=None)
3538
timeout: int = field(init=False)
3639

3740
def __post_init__(self):
@@ -63,17 +66,17 @@ async def _create_hook_environment(self, context: HookContext):
6366
}
6467
)
6568

66-
yield hook_env
69+
yield session, hook_env
6770

68-
async def _execute_hook(self, command: str, context: HookContext) -> bool:
71+
async def _execute_hook(self, command: str, context: HookContext, log_source: LogSource) -> bool:
6972
"""Execute a single hook command."""
7073
if not command or not command.strip():
7174
logger.debug("Hook command is empty, skipping")
7275
return True
7376

7477
logger.info("Executing hook: %s", command.strip().split("\n")[0][:100])
7578

76-
async with self._create_hook_environment(context) as hook_env:
79+
async with self._create_hook_environment(context) as (session, hook_env):
7780
try:
7881
# Execute the hook command using shell
7982
process = await asyncio.create_subprocess_shell(
@@ -84,6 +87,12 @@ async def _execute_hook(self, command: str, context: HookContext) -> bool:
8487
)
8588

8689
try:
90+
# Determine which session to use for logging - prefer main session if available
91+
logging_session = self.main_session if self.main_session is not None else session
92+
93+
# Create a logger with automatic source registration
94+
hook_logger = get_logger(f"hook.{context.lease_name}", log_source, logging_session)
95+
8796
# Stream output line-by-line for real-time logging
8897
output_lines = []
8998

@@ -94,7 +103,8 @@ async def read_output():
94103
break
95104
line_decoded = line.decode().rstrip()
96105
output_lines.append(line_decoded)
97-
logger.info("[Hook Output] %s", line_decoded)
106+
# Route hook output through the logging system
107+
hook_logger.info(line_decoded)
98108

99109
# Run output reading and process waiting concurrently with timeout
100110
await asyncio.wait_for(asyncio.gather(read_output(), process.wait()), timeout=self.timeout)
@@ -104,8 +114,6 @@ async def read_output():
104114
return True
105115
else:
106116
logger.error("Hook failed with return code %d", process.returncode)
107-
if output_lines:
108-
logger.error("Hook output: %s", "\n".join(output_lines))
109117
return False
110118

111119
except asyncio.TimeoutError:
@@ -129,7 +137,7 @@ async def execute_pre_lease_hook(self, context: HookContext) -> bool:
129137
return True
130138

131139
logger.info("Executing pre-lease hook for lease %s", context.lease_name)
132-
return await self._execute_hook(self.config.pre_lease, context)
140+
return await self._execute_hook(self.config.pre_lease, context, LogSource.BEFORE_LEASE_HOOK)
133141

134142
async def execute_post_lease_hook(self, context: HookContext) -> bool:
135143
"""Execute the post-lease hook."""
@@ -138,4 +146,4 @@ async def execute_post_lease_hook(self, context: HookContext) -> bool:
138146
return True
139147

140148
logger.info("Executing post-lease hook for lease %s", context.lease_name)
141-
return await self._execute_hook(self.config.post_lease, context)
149+
return await self._execute_hook(self.config.post_lease, context, LogSource.AFTER_LEASE_HOOK)
Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,90 @@
11
import logging
22
from collections import deque
3+
from contextlib import contextmanager
4+
from threading import RLock
5+
from typing import TYPE_CHECKING
36

47
from jumpstarter_protocol import jumpstarter_pb2
58

69
from jumpstarter.common import LogSource
710

11+
if TYPE_CHECKING:
12+
from .session import Session
13+
814

915
class LogHandler(logging.Handler):
1016
def __init__(self, queue: deque, source: LogSource = LogSource.UNSPECIFIED):
1117
logging.Handler.__init__(self)
1218
self.queue = queue
1319
self.listener = None
1420
self.source = source # LogSource enum value
21+
self._lock = RLock()
22+
self._child_handlers = {} # Dict of logger_name -> LogSource mappings
23+
24+
def add_child_handler(self, logger_name: str, source: LogSource):
25+
"""Add a child handler that will route logs from a specific logger with a different source."""
26+
with self._lock:
27+
self._child_handlers[logger_name] = source
28+
29+
def remove_child_handler(self, logger_name: str):
30+
"""Remove a child handler mapping."""
31+
with self._lock:
32+
self._child_handlers.pop(logger_name, None)
33+
34+
def get_source_for_record(self, record):
35+
"""Determine the appropriate log source for a record."""
36+
with self._lock:
37+
# Check if this record comes from a logger with a specific source mapping
38+
logger_name = record.name
39+
for mapped_logger, source in self._child_handlers.items():
40+
if logger_name.startswith(mapped_logger):
41+
return source
42+
return self.source
1543

1644
def enqueue(self, record):
1745
self.queue.append(record)
1846

1947
def prepare(self, record):
48+
source = self.get_source_for_record(record)
2049
return jumpstarter_pb2.LogStreamResponse(
2150
uuid="",
2251
severity=record.levelname,
2352
message=self.format(record),
24-
source=self.source.value, # Convert to proto value
53+
source=source.value, # Convert to proto value
2554
)
2655

2756
def emit(self, record):
2857
try:
2958
self.enqueue(self.prepare(record))
3059
except Exception:
3160
self.handleError(record)
61+
62+
@contextmanager
63+
def context_log_source(self, logger_name: str, source: LogSource):
64+
"""Context manager to temporarily set a log source for a specific logger."""
65+
self.add_child_handler(logger_name, source)
66+
try:
67+
yield
68+
finally:
69+
self.remove_child_handler(logger_name)
70+
71+
72+
def get_logger(name: str, source: LogSource = LogSource.SYSTEM, session: "Session" = None) -> logging.Logger:
73+
"""
74+
Get a logger with automatic LogSource mapping.
75+
76+
Args:
77+
name: Logger name (e.g., __name__ or custom name)
78+
source: The LogSource to associate with this logger
79+
session: Optional session to register with immediately
80+
81+
Returns:
82+
A standard Python logger instance
83+
"""
84+
logger = logging.getLogger(name)
85+
86+
# If session provided, register the source mapping
87+
if session:
88+
session.add_logger_source(name, source)
89+
90+
return logger

packages/jumpstarter/jumpstarter/exporter/session.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,7 @@
1717
)
1818

1919
from .logging import LogHandler
20-
from jumpstarter.common import ExporterStatus, Metadata, TemporarySocket
21-
from jumpstarter.common.enums import LogSource
20+
from jumpstarter.common import ExporterStatus, LogSource, Metadata, TemporarySocket
2221
from jumpstarter.common.streams import StreamRequestMetadata
2322
from jumpstarter.driver import Driver
2423
from jumpstarter.streams.common import forward_stream
@@ -74,6 +73,9 @@ def __init__(self, *args, root_device, **kwargs):
7473
self._logging_handler = LogHandler(self._logging_queue, LogSource.SYSTEM)
7574
self._status_update_event = Event()
7675

76+
# Map all driver logs to DRIVER source
77+
self._logging_handler.add_child_handler("driver.", LogSource.DRIVER)
78+
7779
@asynccontextmanager
7880
async def serve_port_async(self, port):
7981
server = grpc.aio.server()
@@ -153,6 +155,18 @@ def update_status(self, status: int | ExporterStatus, message: str = ""):
153155
self._current_status = status
154156
self._status_message = message
155157

158+
def add_logger_source(self, logger_name: str, source: LogSource):
159+
"""Add a log source mapping for a specific logger."""
160+
self._logging_handler.add_child_handler(logger_name, source)
161+
162+
def remove_logger_source(self, logger_name: str):
163+
"""Remove a log source mapping for a specific logger."""
164+
self._logging_handler.remove_child_handler(logger_name)
165+
166+
def context_log_source(self, logger_name: str, source: LogSource):
167+
"""Context manager to temporarily set a log source for a specific logger."""
168+
return self._logging_handler.context_log_source(logger_name, source)
169+
156170
async def GetStatus(self, request, context):
157171
"""Get the current exporter status."""
158172
logger.debug("GetStatus() -> %s", self._current_status)

0 commit comments

Comments
 (0)