-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathauto_deploy.py
More file actions
287 lines (243 loc) · 10.7 KB
/
auto_deploy.py
File metadata and controls
287 lines (243 loc) · 10.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
import asyncio
import logging
import time
from datetime import UTC, datetime
import httpx
from dateutil import parser
from docker.models.containers import Container
from cogstack_model_gateway.common.config.models import Config
from cogstack_model_gateway.common.containers import get_models
from cogstack_model_gateway.common.models import (
ModelDeploymentType,
ModelManager,
OnDemandModelConfig,
)
from cogstack_model_gateway.gateway.core.models import run_model_container
from cogstack_model_gateway.gateway.routers.utils import get_cms_url, resolve_model_host
log = logging.getLogger("cmg.gateway.auto_deploy")
# Stale lock threshold: deployment locks older than this are considered stale
STALE_DEPLOYMENT_LOCK_SECONDS = 300 # 5 minutes
def is_model_running(model_name: str) -> bool:
"""Check if a model container is currently running.
Args:
model_name: Service name or IP address of the model
Returns:
True if a container with matching service name or IP address is found
"""
running_models = get_models(all=False, managed_only=False)
return model_name in (
{m["service_name"] for m in running_models} | {m["ip_address"] for m in running_models}
)
async def wait_for_model_health(model_name: str, timeout: int, check_interval: float = 1.0) -> bool:
"""Poll model's /readyz endpoint until healthy or timeout.
Args:
model_name: Name of the model
timeout: Maximum seconds to wait
check_interval: Seconds between health checks (exponential backoff)
Returns:
True if model became healthy, False if timeout
"""
start_time = time.time()
interval = check_interval
log.info("Waiting for model '%s' to become ready (timeout: %ds)", model_name, timeout)
url = get_cms_url(resolve_model_host(model_name), "readyz")
async with httpx.AsyncClient(verify=False) as client:
while time.time() - start_time < timeout:
try:
response = await client.get(url, timeout=5.0)
if response.status_code == 200:
elapsed = time.time() - start_time
log.info("Model '%s' is ready (took %.1fs)", model_name, elapsed)
return True
log.debug(
"Model '%s' not ready yet (status: %d), retrying in %.1fs",
model_name,
response.status_code,
interval,
)
except (httpx.RequestError, httpx.TimeoutException) as e:
log.debug(
"Health check failed for '%s': %s, retrying in %.1fs", model_name, e, interval
)
await asyncio.sleep(interval)
# Exponential backoff, max 8 seconds
interval = min(interval * 2, 8.0)
elapsed = time.time() - start_time
log.warning(
"Model '%s' did not become ready within %ds (elapsed: %.1fs)", model_name, timeout, elapsed
)
return False
def deploy_on_demand_model(
model_config: OnDemandModelConfig, model_type: str, model_manager: ModelManager
) -> Container:
"""Deploy an on-demand model container.
Creates database entry with ready=False, deploys container, then marks ready=True.
Args:
model_config: Configuration for the on-demand model
model_manager: Model manager for database operations
Returns:
Deployed container
Raises:
ValueError: If model already exists in database (another worker is deploying)
Exception: If container deployment fails
"""
model_name = model_config.model_name
log.info("Starting deployment of on-demand model: %s", model_name)
# This will raise ValueError if model already exists (another worker deploying)
model_manager.create_model(
model_name=model_name,
deployment_type=ModelDeploymentType.AUTO,
idle_ttl=model_config.idle_ttl,
)
try:
container = run_model_container(
model_name=model_name,
model_uri=model_config.model_uri,
model_type=model_type,
deployment_type=ModelDeploymentType.AUTO,
ttl=model_config.idle_ttl,
resources=model_config.deploy.__dict__ if model_config.deploy else None,
)
log.info("Successfully deployed container for model: %s", model_name)
return container
except Exception as e:
log.error("Failed to deploy model '%s': %s", model_name, e)
model_manager.delete_model(model_name)
raise
async def ensure_model_available(
model_name: str,
config: Config,
model_manager: ModelManager,
) -> bool:
"""Ensure a model is available, deploying it if necessary.
This is the main entry point for ensuring ANY model (STATIC, MANUAL, or AUTO)
is available before processing a request. Flow:
1. Check if model is running
2. Check if model is healthy
3. If both pass, ensure model is tracked in database (auto-create STATIC entry if needed)
4. If not available, attempt auto-deployment (only for AUTO-configured models)
Note: This function does NOT record usage - that should be done AFTER the actual
operation succeeds (e.g., after HTTP request completes successfully).
Args:
model_name: Name of the model to ensure is available
config: Global configuration
model_manager: Model manager for database operations
Returns:
True if model is available and ready, False otherwise
"""
# Step 1: Check if model is currently running
if is_model_running(model_name):
log.debug("Model '%s' container is running, checking health", model_name)
# Step 2: Check if model is healthy
url = get_cms_url(resolve_model_host(model_name), "readyz")
try:
async with httpx.AsyncClient(verify=False) as client:
response = await client.get(url, timeout=5.0)
if response.status_code == 200:
log.debug("Model '%s' is running and healthy", model_name)
# Step 3: Ensure model is tracked (auto-create STATIC entry if needed)
existing_model = model_manager.get_model(model_name)
if not existing_model:
log.info(
"Model '%s' is running but not tracked, creating STATIC entry",
model_name,
)
try:
model_manager.create_model(
model_name=model_name,
deployment_type=ModelDeploymentType.STATIC,
ready=True,
)
except ValueError:
# Another worker just created it, that's fine
log.debug("Model '%s' was created by another worker", model_name)
return True
log.info(
"Model '%s' is running but not healthy (status: %d)",
model_name,
response.status_code,
)
except (httpx.RequestError, httpx.TimeoutException) as e:
log.info("Model '%s' is running but health check failed: %s", model_name, e)
else:
log.info("Model '%s' is not currently running", model_name)
# Step 4: Model not running/healthy - check if we can auto-deploy
model_config = model_manager.get_on_demand_config(model_name)
if not model_config:
log.warning(
"Model '%s' is not available and not configured for auto-deployment",
model_name,
)
return False
log.info("Model '%s' not available, initiating auto-deployment", model_name)
# Step 5: Check if another worker is currently deploying this model
existing_model = model_manager.get_model(model_name)
if existing_model and not existing_model.ready:
created_at = parser.isoparse(existing_model.created_at)
age_seconds = (datetime.now(UTC) - created_at).total_seconds()
if age_seconds < STALE_DEPLOYMENT_LOCK_SECONDS:
# Fresh deployment in progress - wait for it
log.info(
"Model '%s' is being deployed by another worker (age: %.1fs), waiting...",
model_name,
age_seconds,
)
# Wait for the other worker's deployment to complete
is_healthy = await wait_for_model_health(
model_name,
config.models.deployment.auto.health_check_timeout,
)
if is_healthy:
# Mark as ready (the other worker might have crashed after deploying)
model_manager.mark_model_ready(model_name)
return True
log.error("Model '%s' deployment by another worker failed or timed out", model_name)
return False
# Stale lock - take over deployment
log.warning(
"Model '%s' has stale deployment lock (age: %.1fs), breaking lock",
model_name,
age_seconds,
)
model_manager.delete_model(model_name)
# Step 6: Deploy the model
try:
model_type = config.tracking_client.get_model_type(model_config.model_uri)
if model_type is None:
log.warning(
f"Could not determine model type for URI '{model_config.model_uri}', using default"
)
model_type = "medcat_umls"
container = deploy_on_demand_model(model_config, model_type, model_manager)
except ValueError as e:
# Another worker just created the entry (race condition)
log.info("Another worker started deploying '%s', waiting for completion: %s", model_name, e)
is_healthy = await wait_for_model_health(
model_name,
config.models.deployment.auto.health_check_timeout,
)
if is_healthy:
model_manager.mark_model_ready(model_name)
return True
return False
except Exception as e:
log.error("Failed to deploy model '%s': %s", model_name, e)
return False
# Step 7: Wait for model to become healthy
is_healthy = await wait_for_model_health(
model_name,
config.models.deployment.auto.health_check_timeout,
)
if is_healthy:
# Mark model as ready in database
model_manager.mark_model_ready(model_name)
log.info("Model '%s' is now available and ready", model_name)
return True
log.error("Model '%s' failed health check, cleaning up", model_name)
try:
container.stop()
container.remove()
except Exception as e:
log.warning("Failed to clean up container for '%s': %s", model_name, e)
model_manager.delete_model(model_name)
return False