Skip to content

Commit b96e592

Browse files
committed
added sglang deployment example
1 parent 7ecc174 commit b96e592

2 files changed

Lines changed: 326 additions & 4 deletions

File tree

examples/container_deployments.py renamed to examples/containers/container_deployments.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66

77
import os
88
import time
9-
from typing import Optional
109

1110
from datacrunch import DataCrunchClient
1211
from datacrunch.exceptions import APIException
@@ -22,6 +21,8 @@
2221
VolumeMount,
2322
ContainerRegistrySettings,
2423
Deployment,
24+
VolumeMountType,
25+
ContainerDeploymentStatus,
2526
)
2627

2728
# Configuration constants
@@ -44,8 +45,8 @@ def wait_for_deployment_health(client: DataCrunchClient, deployment_name: str, m
4445
for attempt in range(max_attempts):
4546
try:
4647
status = client.containers.get_status(deployment_name)
47-
print(f"Deployment status: {status['status']}")
48-
if status['status'] == 'healthy':
48+
print(f"Deployment status: {status}")
49+
if status == ContainerDeploymentStatus.HEALTHY:
4950
return True
5051
time.sleep(delay)
5152
except APIException as e:
@@ -88,7 +89,7 @@ def main() -> None:
8889
),
8990
volume_mounts=[
9091
VolumeMount(
91-
type="scratch",
92+
type=VolumeMountType.SCRATCH,
9293
mount_path="/data"
9394
)
9495
]
Lines changed: 321 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,321 @@
1+
"""Example script demonstrating SGLang model deployment using the DataCrunch API.
2+
3+
This script provides an example of deploying a SGLang server with deepseek-ai/deepseek-llm-7b-chat model,
4+
including creation, monitoring, testing, and cleanup.
5+
"""
6+
7+
import os
8+
import time
9+
import signal
10+
import sys
11+
import requests
12+
13+
from datacrunch import DataCrunchClient
14+
from datacrunch.exceptions import APIException
15+
from datacrunch.containers.containers import (
16+
Container,
17+
ComputeResource,
18+
ScalingOptions,
19+
ScalingPolicy,
20+
ScalingTriggers,
21+
QueueLoadScalingTrigger,
22+
UtilizationScalingTrigger,
23+
HealthcheckSettings,
24+
EntrypointOverridesSettings,
25+
EnvVar,
26+
EnvVarType,
27+
ContainerRegistrySettings,
28+
Deployment,
29+
ContainerDeploymentStatus,
30+
)
31+
32+
# Configuration constants
33+
DEPLOYMENT_NAME = "sglang-deployment-tutorial"
34+
CONTAINER_NAME = "sglang-server"
35+
MODEL_PATH = "deepseek-ai/deepseek-llm-7b-chat"
36+
HF_SECRET_NAME = "huggingface-token"
37+
IMAGE_URL = "docker.io/lmsysorg/sglang:v0.4.1.post6-cu124"
38+
39+
# Environment variables
40+
DATACRUNCH_CLIENT_ID = os.environ.get('DATACRUNCH_CLIENT_ID')
41+
DATACRUNCH_CLIENT_SECRET = os.environ.get('DATACRUNCH_CLIENT_SECRET')
42+
HF_TOKEN = os.environ.get('HF_TOKEN')
43+
INFERENCE_API_KEY = os.environ.get('INFERENCE_API_KEY')
44+
CONTAINERS_API_URL = f'https://containers.datacrunch.io/{DEPLOYMENT_NAME}'
45+
46+
# DataCrunch client instance (global for graceful shutdown)
47+
datacrunch = None
48+
49+
50+
def wait_for_deployment_health(client: DataCrunchClient, deployment_name: str, max_attempts: int = 20, delay: int = 30) -> bool:
51+
"""Wait for deployment to reach healthy status.
52+
53+
Args:
54+
client: DataCrunch API client
55+
deployment_name: Name of the deployment to check
56+
max_attempts: Maximum number of status checks
57+
delay: Delay between checks in seconds
58+
59+
Returns:
60+
bool: True if deployment is healthy, False otherwise
61+
"""
62+
print(f"Waiting for deployment to be healthy (may take several minutes to download model)...")
63+
for attempt in range(max_attempts):
64+
try:
65+
status = client.containers.get_status(deployment_name)
66+
print(
67+
f"Attempt {attempt+1}/{max_attempts} - Deployment status: {status}")
68+
if status == ContainerDeploymentStatus.HEALTHY:
69+
return True
70+
time.sleep(delay)
71+
except APIException as e:
72+
print(f"Error checking deployment status: {e}")
73+
return False
74+
return False
75+
76+
77+
def cleanup_resources(client: DataCrunchClient) -> None:
78+
"""Clean up all created resources.
79+
80+
Args:
81+
client: DataCrunchAPI client
82+
"""
83+
try:
84+
# Delete deployment
85+
client.containers.delete(DEPLOYMENT_NAME)
86+
print("Deployment deleted")
87+
except APIException as e:
88+
print(f"Error during cleanup: {e}")
89+
90+
91+
def graceful_shutdown(signum, frame) -> None:
92+
"""Handle graceful shutdown on signals."""
93+
print(f"\nSignal {signum} received, cleaning up resources...")
94+
try:
95+
cleanup_resources(datacrunch)
96+
except Exception as e:
97+
print(f"Error during cleanup: {e}")
98+
sys.exit(0)
99+
100+
101+
def test_deployment(base_url: str, api_key: str) -> None:
102+
"""Test the deployment with a simple request.
103+
104+
Args:
105+
base_url: The base URL of the deployment
106+
api_key: The API key for authentication
107+
"""
108+
# First, check if the model info endpoint is working
109+
model_info_url = f"{base_url}/get_model_info"
110+
headers = {
111+
'Authorization': f'Bearer {api_key}',
112+
'Content-Type': 'application/json'
113+
}
114+
115+
try:
116+
print("\nTesting /get_model_info endpoint...")
117+
response = requests.get(model_info_url, headers=headers)
118+
if response.status_code == 200:
119+
print("Model info endpoint is working!")
120+
print(f"Response: {response.json()}")
121+
else:
122+
print(f"Request failed with status code {response.status_code}")
123+
print(f"Response: {response.text}")
124+
return
125+
126+
# Now test completions endpoint
127+
print("\nTesting completions API with streaming...")
128+
completions_url = f"{base_url}/v1/completions"
129+
130+
headers = {
131+
'Content-Type': 'application/json',
132+
'Authorization': f'Bearer {api_key}',
133+
'Accept': 'text/event-stream',
134+
'Cache-Control': 'no-cache',
135+
'Connection': 'keep-alive',
136+
}
137+
138+
data = {
139+
"model": MODEL_PATH,
140+
"prompt": "Solar wind is a curious phenomenon. Tell me more about it",
141+
"max_tokens": 128,
142+
"temperature": 0.7,
143+
"top_p": 0.9,
144+
"stream": True
145+
}
146+
147+
with requests.post(completions_url, headers=headers, json=data, stream=True) as response:
148+
if response.status_code == 200:
149+
print("Stream started. Receiving first 5 events...\n")
150+
for i, line in enumerate(response.iter_lines(decode_unicode=True)):
151+
if line:
152+
print(line)
153+
if i >= 4: # Only show first 5 events
154+
print("...(response continues)...")
155+
break
156+
else:
157+
print(
158+
f"Request failed with status code {response.status_code}")
159+
print(f"Response: {response.text}")
160+
161+
except requests.RequestException as e:
162+
print(f"An error occurred: {e}")
163+
164+
165+
def main() -> None:
166+
"""Main function demonstrating SGLang deployment."""
167+
try:
168+
# Check required environment variables
169+
if not DATACRUNCH_CLIENT_ID or not DATACRUNCH_CLIENT_SECRET:
170+
print(
171+
"Please set DATACRUNCH_CLIENT_ID and DATACRUNCH_CLIENT_SECRET environment variables")
172+
return
173+
174+
if not HF_TOKEN:
175+
print("Please set HF_TOKEN environment variable with your Hugging Face token")
176+
return
177+
178+
# Initialize client
179+
global datacrunch
180+
datacrunch = DataCrunchClient(
181+
DATACRUNCH_CLIENT_ID, DATACRUNCH_CLIENT_SECRET)
182+
183+
# Register signal handlers for cleanup
184+
signal.signal(signal.SIGINT, graceful_shutdown)
185+
signal.signal(signal.SIGTERM, graceful_shutdown)
186+
187+
# Create a secret for the Hugging Face token
188+
print(f"Creating secret for Hugging Face token: {HF_SECRET_NAME}")
189+
try:
190+
# Check if secret already exists
191+
existing_secrets = datacrunch.containers.get_secrets()
192+
secret_exists = any(
193+
secret.name == HF_SECRET_NAME for secret in existing_secrets)
194+
195+
if not secret_exists:
196+
datacrunch.containers.create_secret(
197+
HF_SECRET_NAME, HF_TOKEN)
198+
print(f"Secret '{HF_SECRET_NAME}' created successfully")
199+
else:
200+
print(
201+
f"Secret '{HF_SECRET_NAME}' already exists, using existing secret")
202+
except APIException as e:
203+
print(f"Error creating secret: {e}")
204+
return
205+
206+
# Create container configuration
207+
container = Container(
208+
name=CONTAINER_NAME,
209+
image=IMAGE_URL,
210+
exposed_port=30000,
211+
healthcheck=HealthcheckSettings(
212+
enabled=True,
213+
port=30000,
214+
path="/health"
215+
),
216+
entrypoint_overrides=EntrypointOverridesSettings(
217+
enabled=True,
218+
cmd=["python3", "-m", "sglang.launch_server", "--model-path",
219+
MODEL_PATH, "--host", "0.0.0.0", "--port", "30000"]
220+
),
221+
env=[
222+
EnvVar(
223+
name="HF_TOKEN",
224+
value_or_reference_to_secret=HF_SECRET_NAME,
225+
type=EnvVarType.SECRET
226+
)
227+
]
228+
)
229+
230+
# Create scaling configuration - default values
231+
scaling_options = ScalingOptions(
232+
min_replica_count=1,
233+
max_replica_count=2,
234+
scale_down_policy=ScalingPolicy(delay_seconds=300),
235+
scale_up_policy=ScalingPolicy(delay_seconds=300),
236+
queue_message_ttl_seconds=500,
237+
concurrent_requests_per_replica=1,
238+
scaling_triggers=ScalingTriggers(
239+
queue_load=QueueLoadScalingTrigger(threshold=1),
240+
cpu_utilization=UtilizationScalingTrigger(
241+
enabled=True,
242+
threshold=90
243+
),
244+
gpu_utilization=UtilizationScalingTrigger(
245+
enabled=True,
246+
threshold=90
247+
)
248+
)
249+
)
250+
251+
# Create registry and compute settings
252+
registry_settings = ContainerRegistrySettings(is_private=False)
253+
# For a 7B model, General Compute (24GB VRAM) is sufficient
254+
compute = ComputeResource(name="General Compute", size=1)
255+
256+
# Create deployment object
257+
deployment = Deployment(
258+
name=DEPLOYMENT_NAME,
259+
container_registry_settings=registry_settings,
260+
containers=[container],
261+
compute=compute,
262+
scaling=scaling_options,
263+
is_spot=False
264+
)
265+
266+
# Create the deployment
267+
created_deployment = datacrunch.containers.create(deployment)
268+
print(f"Created deployment: {created_deployment.name}")
269+
print("This will take several minutes while the model is downloaded and the server starts...")
270+
271+
# Wait for deployment to be healthy
272+
if not wait_for_deployment_health(datacrunch, DEPLOYMENT_NAME):
273+
print("Deployment health check failed")
274+
cleanup_resources(datacrunch)
275+
return
276+
277+
# Get the deployment endpoint URL and inference API key
278+
containers_api_url = CONTAINERS_API_URL
279+
inference_api_key = INFERENCE_API_KEY
280+
281+
# If not provided as environment variables, prompt the user
282+
if not containers_api_url:
283+
containers_api_url = input(
284+
"Enter your Containers API URL from the DataCrunch dashboard: ")
285+
else:
286+
print(
287+
f"Using Containers API URL from environment: {containers_api_url}")
288+
289+
if not inference_api_key:
290+
inference_api_key = input(
291+
"Enter your Inference API Key from the DataCrunch dashboard: ")
292+
else:
293+
print("Using Inference API Key from environment")
294+
295+
# Test the deployment
296+
if containers_api_url and inference_api_key:
297+
print("\nTesting the deployment...")
298+
test_deployment(containers_api_url, inference_api_key)
299+
300+
# Cleanup or keep running based on user input
301+
keep_running = input(
302+
"\nDo you want to keep the deployment running? (y/n): ")
303+
if keep_running.lower() != 'y':
304+
cleanup_resources(datacrunch)
305+
else:
306+
print(
307+
f"Deployment {DEPLOYMENT_NAME} is running. Don't forget to delete it when finished.")
308+
print("You can delete it from the DataCrunch dashboard or by running:")
309+
print(f"datacrunch.containers.delete('{DEPLOYMENT_NAME}')")
310+
311+
except Exception as e:
312+
print(f"Unexpected error: {e}")
313+
# Attempt cleanup even if there was an error
314+
try:
315+
cleanup_resources(datacrunch)
316+
except Exception as cleanup_error:
317+
print(f"Error during cleanup after failure: {cleanup_error}")
318+
319+
320+
if __name__ == "__main__":
321+
main()

0 commit comments

Comments
 (0)