Skip to content

Commit 6229b1f

Browse files
Add adaptive Mistral backoff and raise retry budget
1 parent 9e66fa7 commit 6229b1f

3 files changed

Lines changed: 226 additions & 13 deletions

File tree

app/src/main/kotlin/com/google/ai/sample/feature/multimodal/PhotoReasoningViewModel.kt

Lines changed: 50 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ import com.google.ai.sample.webrtc.WebRTCSender
7171
import com.google.ai.sample.webrtc.SignalingClient
7272
import org.webrtc.IceCandidate
7373
import kotlin.math.max
74+
import kotlin.math.roundToLong
7475

7576
class PhotoReasoningViewModel(
7677
application: Application,
@@ -183,11 +184,11 @@ class PhotoReasoningViewModel(
183184
// to avoid re-executing already-executed commands
184185
private var incrementalCommandCount = 0
185186

186-
// Mistral rate limiting per API key (1.1 seconds between requests with same key)
187+
// Mistral rate limiting per API key (1.5 seconds between requests with same key)
187188
private val mistralNextAllowedRequestAtMsByKey = mutableMapOf<String, Long>()
188189
private var lastMistralTokenTimeMs = 0L
189190
private var lastMistralTokenKey: String? = null
190-
private val MISTRAL_MIN_INTERVAL_MS = 1100L
191+
private val MISTRAL_MIN_INTERVAL_MS = 1500L
191192

192193
// Accumulated full text during streaming for incremental command parsing
193194
private var streamingAccumulatedText = StringBuilder()
@@ -609,6 +610,7 @@ class PhotoReasoningViewModel(
609610
val currentModel = com.google.ai.sample.GenerativeAiViewModelFactory.getCurrentModel()
610611

611612
clearStaleErrorState()
613+
stopExecutionFlag.set(false)
612614

613615
// Check for Human Expert model
614616
if (currentModel == ModelOption.HUMAN_EXPERT) {
@@ -1139,11 +1141,36 @@ private fun reasonWithMistral(
11391141
mistralNextAllowedRequestAtMsByKey[key] = max(existing, nextAllowedAt)
11401142
}
11411143

1144+
fun markKeyCooldown(key: String, referenceTimeMs: Long, extraDelayMs: Long) {
1145+
val normalizedExtraDelay = extraDelayMs.coerceAtLeast(0L)
1146+
val nextAllowedAt = referenceTimeMs + max(MISTRAL_MIN_INTERVAL_MS, normalizedExtraDelay)
1147+
val existing = mistralNextAllowedRequestAtMsByKey[key] ?: 0L
1148+
mistralNextAllowedRequestAtMsByKey[key] = max(existing, nextAllowedAt)
1149+
}
1150+
11421151
fun remainingWaitForKeyMs(key: String, nowMs: Long): Long {
11431152
val nextAllowedAt = mistralNextAllowedRequestAtMsByKey[key] ?: 0L
11441153
return (nextAllowedAt - nowMs).coerceAtLeast(0L)
11451154
}
11461155

1156+
fun parseRetryAfterMs(headerValue: String?): Long? {
1157+
if (headerValue.isNullOrBlank()) return null
1158+
val seconds = headerValue.trim().toDoubleOrNull() ?: return null
1159+
return (seconds * 1000.0).roundToLong().coerceAtLeast(0L)
1160+
}
1161+
1162+
fun parseRateLimitResetDelayMs(response: okhttp3.Response, nowMs: Long): Long? {
1163+
val resetHeader = response.header("x-ratelimit-reset") ?: return null
1164+
val resetEpochSeconds = resetHeader.trim().toLongOrNull() ?: return null
1165+
val resetMs = resetEpochSeconds * 1000L
1166+
return (resetMs - nowMs).coerceAtLeast(0L)
1167+
}
1168+
1169+
fun adaptiveRetryDelayMs(failureCount: Int): Long {
1170+
val cappedExponent = (failureCount - 1).coerceIn(0, 5)
1171+
return 1000L shl cappedExponent // 1s, 2s, 4s, 8s, 16s, 32s
1172+
}
1173+
11471174
fun isRetryableMistralFailure(code: Int): Boolean {
11481175
return code == 429 || code >= 500
11491176
}
@@ -1153,7 +1180,7 @@ private fun reasonWithMistral(
11531180
var consecutiveFailures = 0
11541181
var blockedKeysThisRound = mutableSetOf<String>()
11551182

1156-
val maxAttempts = availableKeys.size * 2 + 3 // Allow cycling through all keys at least twice
1183+
val maxAttempts = availableKeys.size * 4 + 8
11571184
while (response == null && consecutiveFailures < maxAttempts) {
11581185
if (stopExecutionFlag.get()) break
11591186

@@ -1175,7 +1202,10 @@ private fun reasonWithMistral(
11751202
try {
11761203
val attemptResponse = client.newCall(buildRequest(selectedKey)).execute()
11771204
val requestEndMs = System.currentTimeMillis()
1178-
markKeyCooldown(selectedKey, requestEndMs)
1205+
val retryAfterMs = parseRetryAfterMs(attemptResponse.header("Retry-After"))
1206+
val resetDelayMs = parseRateLimitResetDelayMs(attemptResponse, requestEndMs)
1207+
val serverRequestedDelayMs = max(retryAfterMs ?: 0L, resetDelayMs ?: 0L)
1208+
markKeyCooldown(selectedKey, requestEndMs, serverRequestedDelayMs)
11791209

11801210
if (attemptResponse.isSuccessful) {
11811211
response = attemptResponse
@@ -1192,39 +1222,46 @@ private fun reasonWithMistral(
11921222
attemptResponse.close()
11931223
blockedKeysThisRound.add(selectedKey)
11941224
consecutiveFailures++
1225+
val adaptiveDelay = adaptiveRetryDelayMs(consecutiveFailures)
1226+
markKeyCooldown(
1227+
selectedKey,
1228+
requestEndMs,
1229+
max(serverRequestedDelayMs, adaptiveDelay)
1230+
)
11951231
withContext(Dispatchers.Main) {
11961232
replaceAiMessageText(
1197-
"Mistral temporär nicht verfügbar (Versuch $consecutiveFailures/$maxAttempts). Wiederhole...",
1233+
"Mistral temporär nicht verfügbar (Versuch $consecutiveFailures/$maxAttempts). Warte auf Server-Rate-Limit und wiederhole...",
11981234
isPending = true
11991235
)
12001236
}
12011237
} catch (e: IOException) {
12021238
val requestEndMs = System.currentTimeMillis()
1203-
markKeyCooldown(selectedKey, requestEndMs)
1239+
val adaptiveDelay = adaptiveRetryDelayMs(consecutiveFailures + 1)
1240+
markKeyCooldown(selectedKey, requestEndMs, adaptiveDelay)
12041241
blockedKeysThisRound.add(selectedKey)
12051242
consecutiveFailures++
1206-
if (consecutiveFailures >= 5) {
1207-
throw IOException("Mistral request failed after 5 attempts: ${e.message}", e)
1243+
if (consecutiveFailures >= maxAttempts) {
1244+
throw IOException("Mistral request failed after $maxAttempts attempts: ${e.message}", e)
12081245
}
12091246
withContext(Dispatchers.Main) {
12101247
replaceAiMessageText(
1211-
if (consecutiveFailures >= maxAttempts) {
1212-
throw IOException("Mistral request failed after $maxAttempts attempts: ${e.message}", e)
1248+
"Mistral Netzwerkfehler (Versuch $consecutiveFailures/$maxAttempts). Wiederhole...",
1249+
isPending = true
12131250
)
12141251
}
12151252
}
1216-
"Mistral Netzwerkfehler (Versuch $consecutiveFailures/$maxAttempts). Wiederhole...",
1253+
}
12171254

12181255
if (stopExecutionFlag.get()) {
12191256
throw IOException("Mistral request aborted.")
12201257
}
12211258

1222-
val finalResponse = response ?: throw IOException("Mistral request failed after 5 attempts.")
1259+
val finalResponse = response ?: throw IOException("Mistral request failed after $maxAttempts attempts.")
12231260

12241261
if (!finalResponse.isSuccessful) {
12251262
val errBody = finalResponse.body?.string()
12261263
finalResponse.close()
1227-
val finalResponse = response ?: throw IOException("Mistral request failed after $maxAttempts attempts.")
1264+
throw IOException("Mistral Error ${finalResponse.code}: $errBody")
12281265
}
12291266

12301267
val body = finalResponse.body ?: throw IOException("Empty response body from Mistral")

scripts/mistral_cooldown_probe.py

Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
#!/usr/bin/env python3
2+
import json
3+
import subprocess
4+
import time
5+
from typing import Tuple, List
6+
7+
MISTRAL_API_KEY = "zsEegAJFadHH4uooe2lW0HVNmy1rpqGT"
8+
MISTRAL_MODEL = "mistral-large-latest"
9+
MISTRAL_ENDPOINT = "https://api.mistral.ai/v1/chat/completions"
10+
11+
12+
def now_ms() -> int:
13+
return int(time.time() * 1000)
14+
15+
16+
def curl_chat(payload: dict, stream: bool) -> Tuple[int, int, int]:
17+
"""
18+
Returns: (http_code, request_started_ms, last_token_ms_or_response_end_ms)
19+
For non-stream requests, 3rd value is response-end timestamp.
20+
"""
21+
request_started = now_ms()
22+
cmd = [
23+
"curl",
24+
"-sS",
25+
"-X",
26+
"POST",
27+
MISTRAL_ENDPOINT,
28+
"-H",
29+
"Content-Type: application/json",
30+
"-H",
31+
f"Authorization: Bearer {MISTRAL_API_KEY}",
32+
"--data-binary",
33+
json.dumps(payload),
34+
"-w",
35+
"\nHTTP_STATUS:%{http_code}\n",
36+
]
37+
if stream:
38+
cmd.insert(1, "-N")
39+
40+
proc = subprocess.Popen(
41+
cmd,
42+
stdout=subprocess.PIPE,
43+
stderr=subprocess.STDOUT,
44+
text=True,
45+
bufsize=1,
46+
)
47+
48+
last_token_ms = request_started
49+
http_code = 0
50+
assert proc.stdout is not None
51+
for line in proc.stdout:
52+
line = line.rstrip("\n")
53+
if line.startswith("data:"):
54+
data = line[5:].strip()
55+
if data and data != "[DONE]":
56+
last_token_ms = now_ms()
57+
elif line.startswith("HTTP_STATUS:"):
58+
try:
59+
http_code = int(line.split(":", 1)[1].strip())
60+
except ValueError:
61+
http_code = 0
62+
63+
exit_code = proc.wait()
64+
if exit_code != 0:
65+
raise RuntimeError(f"curl failed with exit code {exit_code}")
66+
67+
if not stream:
68+
last_token_ms = now_ms()
69+
return http_code, request_started, last_token_ms
70+
71+
72+
def sleep_until(target_ms: int) -> None:
73+
remaining = target_ms - now_ms()
74+
if remaining > 0:
75+
time.sleep(remaining / 1000.0)
76+
77+
78+
def probe_last_token_mode(delays: List[int]) -> None:
79+
print("=== PROBE: ab_letztem_token ===")
80+
min_success = None
81+
for delay in delays:
82+
stream_payload = {
83+
"model": MISTRAL_MODEL,
84+
"messages": [{"role": "user", "content": "Sag nur OK."}],
85+
"max_tokens": 32,
86+
"stream": True,
87+
}
88+
code, _, last_token = curl_chat(stream_payload, stream=True)
89+
if code != 200:
90+
print(f"baseline_stream_failed http={code}")
91+
continue
92+
93+
sleep_until(last_token + delay)
94+
probe_payload = {
95+
"model": MISTRAL_MODEL,
96+
"messages": [{"role": "user", "content": "OK?"}],
97+
"max_tokens": 1,
98+
"stream": False,
99+
}
100+
probe_code, _, _ = curl_chat(probe_payload, stream=False)
101+
print(f"delay={delay}ms http={probe_code}")
102+
if min_success is None and probe_code == 200:
103+
min_success = delay
104+
print(f"min_success_delay_ms={min_success}")
105+
print()
106+
107+
108+
def probe_request_start_mode(delays: List[int]) -> None:
109+
print("=== PROBE: ab_request_start ===")
110+
min_success = None
111+
for delay in delays:
112+
baseline_payload = {
113+
"model": MISTRAL_MODEL,
114+
"messages": [{"role": "user", "content": "Sag nur OK."}],
115+
"max_tokens": 32,
116+
"stream": True,
117+
}
118+
request_started = now_ms()
119+
baseline_cmd = [
120+
"curl",
121+
"-sS",
122+
"-N",
123+
"-X",
124+
"POST",
125+
MISTRAL_ENDPOINT,
126+
"-H",
127+
"Content-Type: application/json",
128+
"-H",
129+
f"Authorization: Bearer {MISTRAL_API_KEY}",
130+
"--data-binary",
131+
json.dumps(baseline_payload),
132+
"-w",
133+
"\nHTTP_STATUS:%{http_code}\n",
134+
]
135+
baseline_proc = subprocess.Popen(
136+
baseline_cmd,
137+
stdout=subprocess.PIPE,
138+
stderr=subprocess.STDOUT,
139+
text=True,
140+
bufsize=1,
141+
)
142+
143+
sleep_until(request_started + delay)
144+
probe_payload = {
145+
"model": MISTRAL_MODEL,
146+
"messages": [{"role": "user", "content": "OK?"}],
147+
"max_tokens": 1,
148+
"stream": False,
149+
}
150+
probe_code, _, _ = curl_chat(probe_payload, stream=False)
151+
print(f"delay={delay}ms http={probe_code}")
152+
if min_success is None and probe_code == 200:
153+
min_success = delay
154+
155+
baseline_output, _ = baseline_proc.communicate()
156+
baseline_status = 0
157+
for line in baseline_output.splitlines():
158+
if line.startswith("HTTP_STATUS:"):
159+
try:
160+
baseline_status = int(line.split(":", 1)[1].strip())
161+
except ValueError:
162+
baseline_status = 0
163+
if baseline_status != 200:
164+
print(f"baseline_stream_failed http={baseline_status}")
165+
print(f"min_success_delay_ms={min_success}")
166+
print()
167+
168+
169+
if __name__ == "__main__":
170+
step_delays = list(range(100, 3001, 100))
171+
probe_last_token_mode(step_delays)
172+
probe_request_start_mode(step_delays)

scripts/mistral_cooldown_probe.sh

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
#!/usr/bin/env bash
2+
set -euo pipefail
3+
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
4+
exec python3 "$SCRIPT_DIR/mistral_cooldown_probe.py"

0 commit comments

Comments
 (0)