Skip to content

Commit b497cbf

Browse files
author
Mateusz
committed
feat(warmup): add sliding usage window warm-up scheduler with multi-account fan-out
Configurable scheduled lightweight LLM requests to shift provider rate-limit windows. Supports explicit backend:model routes, weekend control, random jitter, retry-once for transient errors, and openai-codex multi-account fan-out.
1 parent d071816 commit b497cbf

16 files changed

Lines changed: 1326 additions & 10 deletions

src/connectors/_openai_codex_connector.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -647,6 +647,55 @@ def _configure_managed_oauth_credential_manager(self) -> None:
647647
exc_info=True,
648648
)
649649

650+
async def list_managed_oauth_account_ids(self) -> list[str]:
651+
"""Return eligible managed OAuth account IDs for warm-up fan-out.
652+
653+
Accounts requiring re-auth are excluded. Allowed account filtering configured
654+
in managed OAuth settings is applied by delegating to the credential manager.
655+
"""
656+
selector = getattr(self._credential_manager, "_managed_selector", None)
657+
if selector is None:
658+
return []
659+
660+
try:
661+
reload_accounts = getattr(selector, "reload_accounts", None)
662+
if callable(reload_accounts):
663+
result = reload_accounts()
664+
if inspect.isawaitable(result):
665+
await result
666+
667+
now_ms = int(time.time() * 1000)
668+
available_accounts_fn = getattr(selector, "_available_accounts", None)
669+
if callable(available_accounts_fn):
670+
maybe_accounts = available_accounts_fn(now_ms)
671+
if (
672+
not isinstance(maybe_accounts, tuple)
673+
or len(maybe_accounts) != 2
674+
or not isinstance(maybe_accounts[0], Sequence)
675+
or not isinstance(maybe_accounts[1], Sequence)
676+
):
677+
return []
678+
available, eligible = maybe_accounts
679+
if available and logger.isEnabledFor(logging.DEBUG):
680+
logger.debug(
681+
"OpenAI Codex warm-up account fan-out candidates: %d available, %d eligible",
682+
len(available),
683+
len(eligible),
684+
)
685+
return [
686+
account.account_id
687+
for account in eligible
688+
if isinstance(account.account_id, str) and account.account_id
689+
]
690+
except Exception as exc:
691+
if logger.isEnabledFor(logging.WARNING):
692+
logger.warning(
693+
"Failed to enumerate managed OAuth accounts for warm-up fan-out: %s",
694+
exc,
695+
exc_info=True,
696+
)
697+
return []
698+
650699
@property
651700
def _auth_credentials(self) -> dict[str, Any] | None:
652701
return getattr(self._credential_manager, "_auth_credentials", None)

src/connectors/openai_codex/credentials.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1100,7 +1100,7 @@ def validate_current_credentials(self) -> ValidationResult:
11001100
return ValidationResult.failure("No credentials loaded")
11011101
return self._validate_credentials_structure(self._auth_credentials)
11021102

1103-
def get_account_id(self) -> str | None:
1103+
def get_account_id(self) -> str | None:
11041104
"""Return ChatGPT account ID from loaded credentials.
11051105
11061106
Returns:
@@ -1148,7 +1148,15 @@ def get_account_id(self) -> str | None:
11481148
if isinstance(token_account_id, str) and token_account_id:
11491149
return token_account_id
11501150

1151-
return None
1151+
return None
1152+
1153+
async def list_managed_oauth_account_ids(self) -> list[str]:
1154+
"""Return eligible managed OAuth account IDs for warm-up fan-out."""
1155+
if not self._managed_enabled():
1156+
return []
1157+
1158+
await self._managed_selector.reload_accounts()
1159+
return await self._managed_selector.list_eligible_account_ids()
11521160

11531161

11541162
def _extract_chatgpt_account_id_from_jwt(token: str) -> str | None:

src/connectors/openai_codex/interfaces.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,11 @@ def get_account_id(self) -> str | None:
160160
"""
161161
...
162162

163+
@abstractmethod
164+
async def list_managed_oauth_account_ids(self) -> list[str]:
165+
"""Return eligible managed OAuth account IDs for fan-out warm-up calls."""
166+
...
167+
163168

164169
class IPayloadBuilder(ABC):
165170
"""Interface for building Codex payloads.

src/connectors/openai_codex/managed_oauth_selector.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,9 +127,18 @@ def _available_accounts(
127127
if account.needs_reauth:
128128
continue
129129
available.append(account)
130-
eligible = [account for account in available if not account.is_rate_limited(now_ms)]
130+
eligible = [
131+
account for account in available if not account.is_rate_limited(now_ms)
132+
]
131133
return available, eligible
132134

135+
async def list_eligible_account_ids(self) -> list[str]:
136+
"""Return currently eligible account IDs after reload/refresh gating."""
137+
await self._ensure_accounts_loaded()
138+
now_ms = int(time.time() * 1000)
139+
_, eligible = self._available_accounts(now_ms)
140+
return [account.account_id for account in eligible]
141+
133142
def _select_by_strategy(
134143
self,
135144
eligible: list[ManagedOAuthAccount],
@@ -333,4 +342,3 @@ async def rotate_on_auth_failure(
333342
session_id=session_id,
334343
ignore_session_affinity=True,
335344
)
336-

src/core/app/application_builder.py

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -630,6 +630,9 @@ async def lifespan(app: FastAPI): # type: ignore[no-untyped-def,no-any-return,m
630630
from src.core.services.backend_startup_disablement import (
631631
apply_backend_disablement_at_startup,
632632
)
633+
from src.core.services.usage_window_warmup_service import (
634+
UsageWindowWarmupService,
635+
)
633636

634637
app_config = service_provider.get_service(AppConfig)
635638
backend_lifecycle_manager = service_provider.get_service(
@@ -644,6 +647,9 @@ async def lifespan(app: FastAPI): # type: ignore[no-untyped-def,no-any-return,m
644647
)
645648

646649
routing_service = service_provider.get_service(BackendRoutingService)
650+
usage_window_warmup_service = service_provider.get_service(
651+
UsageWindowWarmupService
652+
)
647653
if routing_service is not None:
648654
active_routing_service = routing_service
649655

@@ -665,7 +671,7 @@ async def _startup_capability_refresh() -> None:
665671
)
666672

667673
refresh_interval_seconds = 0.0
668-
if app_config is not None and app_config.routing is not None:
674+
if app_config is not None:
669675
refresh_interval_seconds = (
670676
app_config.routing.capability_refresh_interval_seconds
671677
or 0.0
@@ -674,6 +680,8 @@ async def _startup_capability_refresh() -> None:
674680
_ = asyncio.create_task( # noqa: RUF006 - fire-and-forget
675681
active_routing_service.start_model_capability_refresh()
676682
)
683+
if usage_window_warmup_service is not None:
684+
await usage_window_warmup_service.start()
677685
except Exception as exc:
678686
if logger.isEnabledFor(logging.WARNING):
679687
logger.warning(
@@ -690,6 +698,31 @@ async def _startup_capability_refresh() -> None:
690698
logger.info("Shutting down application")
691699

692700
# Stop capability refresh loop before backend shutdown.
701+
try:
702+
from src.core.services.usage_window_warmup_service import (
703+
UsageWindowWarmupService,
704+
)
705+
706+
usage_window_warmup_service = service_provider.get_service(
707+
UsageWindowWarmupService
708+
)
709+
if usage_window_warmup_service is not None:
710+
await usage_window_warmup_service.stop()
711+
except (RuntimeError, AttributeError, asyncio.CancelledError) as exc:
712+
if logger.isEnabledFor(logging.WARNING):
713+
logger.warning(
714+
"Failed to stop usage window warm-up scheduler: %s",
715+
type(exc).__name__,
716+
exc_info=True,
717+
)
718+
except Exception as exc:
719+
if logger.isEnabledFor(logging.WARNING):
720+
logger.warning(
721+
"Failed to stop usage window warm-up scheduler: %s",
722+
type(exc).__name__,
723+
exc_info=True,
724+
)
725+
693726
try:
694727
from src.core.services.backend_routing_service import (
695728
BackendRoutingService,

src/core/app/stages/core_services.py

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,15 @@
1919

2020
from src.core.config.app_config import AppConfig
2121
from src.core.di.container import ServiceCollection
22+
from src.core.di.registrations._shared import register_singleton_if_absent
2223
from src.core.interfaces.application_state_interface import IApplicationState
23-
from src.core.interfaces.di_interface import IServiceProvider
24+
from src.core.interfaces.backend_completion_flow_interface import (
25+
IBackendCompletionFlow,
26+
)
27+
from src.core.interfaces.backend_lifecycle_manager_interface import (
28+
IBackendLifecycleManager,
29+
)
30+
from src.core.interfaces.di_interface import IServiceProvider
2431
from src.core.interfaces.response_parser_interface import IResponseParser
2532

2633
# from src.core.interfaces.secure_state_interface import ISecureStateService # Removed unresolved import
@@ -323,6 +330,9 @@ def secure_state_factory(provider: IServiceProvider) -> SecureStateService:
323330
# Register connection activity tracker (if enabled)
324331
self._register_activity_tracker(services, config)
325332

333+
# Register usage window warm-up scheduler (if enabled)
334+
self._register_usage_window_warmup_service(services, config)
335+
326336
# Register wire capture service
327337
self._register_wire_capture_service(services)
328338

@@ -475,6 +485,44 @@ def cleanup_scheduler_factory(
475485
if logger.isEnabledFor(logging.INFO):
476486
logger.info("Connection tracker cleanup scheduler registered")
477487

488+
def _register_usage_window_warmup_service(
489+
self, services: ServiceCollection, config: AppConfig
490+
) -> None:
491+
if not config.usage_window_warmup.enabled:
492+
if logger.isEnabledFor(logging.DEBUG):
493+
logger.debug("Usage window warm-up scheduler disabled")
494+
return
495+
496+
from src.core.services.usage_window_warmup_service import (
497+
UsageWindowWarmupService,
498+
)
499+
500+
def warmup_service_factory(
501+
provider: IServiceProvider,
502+
) -> UsageWindowWarmupService:
503+
completion_flow = provider.get_required_service(
504+
IBackendCompletionFlow # type: ignore[type-abstract]
505+
)
506+
backend_lifecycle_manager = provider.get_service(
507+
IBackendLifecycleManager # type: ignore[type-abstract]
508+
)
509+
from src.core.services.warmup_target_resolver import WarmupTargetResolver
510+
511+
target_resolver = WarmupTargetResolver(backend_lifecycle_manager)
512+
return UsageWindowWarmupService(
513+
completion_flow=completion_flow,
514+
config=config.usage_window_warmup,
515+
target_resolver=target_resolver,
516+
)
517+
518+
register_singleton_if_absent(
519+
services,
520+
UsageWindowWarmupService,
521+
implementation_factory=warmup_service_factory,
522+
)
523+
if logger.isEnabledFor(logging.INFO):
524+
logger.info("Usage window warm-up scheduler registered")
525+
478526
def _register_wire_capture_service(self, services: ServiceCollection) -> None:
479527
"""Register wire capture service.
480528

src/core/config/app_config.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ def save(self, path: str | Path) -> None:
107107
"usage_tracking",
108108
"replacement",
109109
"health_check",
110+
"usage_window_warmup",
110111
"failure_handling",
111112
"routing",
112113
"dynamic_compression",

src/core/config/models/app_config_model.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,9 @@
4444
)
4545
from src.core.domain.configuration.replacement_config import ReplacementConfig
4646
from src.core.domain.configuration.sandboxing_config import SandboxingConfiguration
47+
from src.core.domain.configuration.usage_window_warmup_config import (
48+
UsageWindowWarmupConfig,
49+
)
4750
from src.core.domain.model_utils import ModelDefaults
4851
from src.core.interfaces.configuration_interface import IConfig
4952
from src.core.interfaces.model_bases import DomainModel
@@ -104,6 +107,9 @@ class AppConfigModel(DomainModel, IConfig):
104107
end_of_session: EndOfSessionConfig = Field(default_factory=EndOfSessionConfig)
105108
replacement: ReplacementConfig = Field(default_factory=ReplacementConfig)
106109
health_check: HealthCheckConfig = Field(default_factory=HealthCheckConfig)
110+
usage_window_warmup: UsageWindowWarmupConfig = Field(
111+
default_factory=UsageWindowWarmupConfig
112+
)
107113
failure_handling: FailureHandlingConfig = Field(
108114
default_factory=FailureHandlingConfig
109115
)
@@ -141,18 +147,21 @@ class AppConfigModel(DomainModel, IConfig):
141147

142148
@field_validator("auto_append_first_prompt_filename", mode="before")
143149
@classmethod
144-
def validate_auto_append_first_prompt_filename(cls, v: str | None) -> str | None:
145-
if v is None or (isinstance(v, str) and not v.strip()):
150+
def validate_auto_append_first_prompt_filename(cls, v: Any) -> str | None:
151+
if v is None:
146152
return None
147153
if not isinstance(v, str):
148154
raise ValueError("auto_append_first_prompt_filename must be a string")
149-
suf = Path(v.strip()).suffix.lower()
155+
stripped = v.strip()
156+
if not stripped:
157+
return None
158+
suf = Path(stripped).suffix.lower()
150159
if suf not in (".txt", ".md"):
151160
raise ValueError(
152161
f"Invalid auto_append_first_prompt_filename {v!r}: "
153162
"must end with .txt or .md"
154163
)
155-
return v.strip()
164+
return stripped
156165

157166
def model_is_functional(self, model_id: str) -> bool:
158167
return self.backends.model_is_functional(model_id)
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
"""Configuration for scheduled sliding usage window warm-up requests."""
2+
3+
from __future__ import annotations
4+
5+
import re
6+
7+
from pydantic import ConfigDict, Field, field_validator
8+
9+
from src.core.domain.model_utils import (
10+
has_explicit_backend_selector,
11+
parse_model_backend,
12+
)
13+
from src.core.interfaces.model_bases import DomainModel
14+
15+
_TIME_PATTERN = re.compile(r"^(?:[01]\d|2[0-3]):[0-5]\d$")
16+
17+
18+
class UsageWindowWarmupEntryConfig(DomainModel):
19+
"""Single scheduled warm-up entry for a concrete backend:model route."""
20+
21+
model_config = ConfigDict(frozen=True)
22+
23+
model: str = Field(...)
24+
time: str = Field(...)
25+
execute_on_weekend: bool = False
26+
27+
@field_validator("model")
28+
@classmethod
29+
def validate_model(cls, value: str) -> str:
30+
trimmed = value.strip()
31+
if not trimmed:
32+
raise ValueError("usage window warm-up model must be provided")
33+
if "^" in trimmed or "|" in trimmed:
34+
raise ValueError(
35+
"usage window warm-up model cannot use composite routing operators"
36+
)
37+
if not has_explicit_backend_selector(trimmed):
38+
raise ValueError(
39+
"usage window warm-up model must use an explicit backend:model route"
40+
)
41+
42+
parsed = parse_model_backend(trimmed)
43+
if not parsed.backend_type.strip() or not parsed.model_name.strip():
44+
raise ValueError(
45+
"usage window warm-up model must include a non-empty backend and model"
46+
)
47+
return trimmed
48+
49+
@field_validator("time")
50+
@classmethod
51+
def validate_time(cls, value: str) -> str:
52+
trimmed = value.strip()
53+
if not _TIME_PATTERN.fullmatch(trimmed):
54+
raise ValueError("usage window warm-up time must use HH:MM 24-hour format")
55+
return trimmed
56+
57+
58+
class UsageWindowWarmupConfig(DomainModel):
59+
"""Top-level warm-up scheduler configuration."""
60+
61+
model_config = ConfigDict(frozen=True)
62+
63+
enabled: bool = False
64+
entries: list[UsageWindowWarmupEntryConfig] = Field(default_factory=list)
65+
66+
67+
DEFAULT_USAGE_WINDOW_WARMUP_CONFIG = UsageWindowWarmupConfig()
68+
69+
70+
__all__ = [
71+
"DEFAULT_USAGE_WINDOW_WARMUP_CONFIG",
72+
"UsageWindowWarmupConfig",
73+
"UsageWindowWarmupEntryConfig",
74+
]

0 commit comments

Comments
 (0)