add http sse cosyvocie tts api and omni createItem function#204
add http sse cosyvocie tts api and omni createItem function#204songguocola wants to merge 2 commits into
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces HTTP-based speech synthesis support, including both SSE streaming and non-SSE URL-based results, and adds tool calling functionality to the Qwen3 Omni realtime conversation. Additionally, it implements 60-second connection timeouts for WebSocket-based services. Review feedback highlights the need for consistent error handling in synthesis results, the importance of verifying connection status after timeouts to prevent silent failures, and concerns regarding memory management when using ThreadLocal storage or unconditional audio accumulation during streaming.
| private HttpSpeechSynthesisResult convertNonSSEResult(DashScopeResult dashScopeResult) { | ||
| HttpSpeechSynthesisResult result = new HttpSpeechSynthesisResult(); |
There was a problem hiding this comment.
The convertNonSSEResult method lacks error checking for the API response. If the API returns an error (e.g., invalid parameters or API key issues), dashScopeResult.getCode() will be non-null, but this method will proceed to try and parse it as a successful result. This should be handled similarly to convertResult by throwing an ApiException.
private HttpSpeechSynthesisResult convertNonSSEResult(DashScopeResult dashScopeResult) {
if (dashScopeResult.getCode() != null && !dashScopeResult.getCode().isEmpty()) {
String errorMsg =
dashScopeResult.getMessage() != null ? dashScopeResult.getMessage() : "Unknown error";
Status status =
Status.builder()
.statusCode(
dashScopeResult.getStatusCode() != null ? dashScopeResult.getStatusCode() : 400)
.code(dashScopeResult.getCode())
.message(errorMsg)
.requestId(dashScopeResult.getRequestId())
.build();
throw new ApiException(status);
}
HttpSpeechSynthesisResult result = new HttpSpeechSynthesisResult();| if (!connectLatch.get().await(60, TimeUnit.SECONDS)) { | ||
| throw new RuntimeException("Connection timed out after 60 seconds"); | ||
| } |
There was a problem hiding this comment.
The connect() method waits for the connectLatch but does not verify if the connection was actually successful. Since onFailure also counts down the latch, the method might return normally even if the WebSocket failed to open. You should check the isOpen status after the latch is released.
if (!connectLatch.get().await(60, TimeUnit.SECONDS)) {
throw new RuntimeException("Connection timed out after 60 seconds");
}
if (!isOpen.get()) {
throw new RuntimeException("Connection failed");
}| if (!connectLatch.get().await(60, TimeUnit.SECONDS)) { | ||
| throw new RuntimeException("Connection timed out after 60 seconds"); | ||
| } |
There was a problem hiding this comment.
Similar to OmniRealtimeConversation, the connect() method should verify that the connection is open after the latch is released, as onFailure also triggers the countdown. Proceeding with a failed connection will lead to subsequent errors when attempting to send messages.
if (!connectLatch.get().await(60, TimeUnit.SECONDS)) {
throw new RuntimeException("Connection timed out after 60 seconds");
}
if (!isOpen.get()) {
throw new RuntimeException("Connection failed");
}| log.error("Error sending audio from file: {}", filePath, e); | ||
| } | ||
| //wait 5 seconds for demo response done | ||
| Thread.sleep(5 * 1000); |
There was a problem hiding this comment.
The sample program might exit or close the conversation before processing tool calls that arrive after the audio file has been fully read. The current 5-second sleep does not check for or handle pending tool calls that might be triggered by the model's final processing of the audio stream.
// wait for demo response done and handle any pending tool calls
long startTime = System.currentTimeMillis();
while (System.currentTimeMillis() - startTime < 5000) {
if (handlePendingToolCalls(conversation)) {
conversation.createResponse(null, Arrays.asList(OmniRealtimeModality.AUDIO, OmniRealtimeModality.TEXT));
}
Thread.sleep(200);
}| @Slf4j | ||
| public class HttpSpeechSynthesizer { | ||
|
|
||
| private final ThreadLocal<ByteBuffer> accumulatedAudioData = new ThreadLocal<>(); |
There was a problem hiding this comment.
Using an instance-level ThreadLocal for accumulatedAudioData can lead to memory overhead and potential leaks in thread-pooled environments. Each instance of HttpSpeechSynthesizer creates a new ThreadLocal key. If many instances are created and used across a fixed set of threads, the ThreadLocalMap in those threads will accumulate many entries. Additionally, the thread-local nature of this data means it's only accessible from the specific thread that performed the synthesis, which can be confusing for users of the asynchronous streamCall API if they attempt to access it from a different thread. Consider refactoring this to store the accumulated data within the result object or the callback context.
| accumulatedAudioData.remove(); | ||
|
|
||
| SynchronizeHalfDuplexApi<HttpSpeechSynthesisParam> api = createApi(true); | ||
| ByteArrayOutputStream audioBuffer = new ByteArrayOutputStream(); |
There was a problem hiding this comment.
The streamCall method unconditionally accumulates all received audio chunks into a ByteArrayOutputStream. This can lead to high memory usage or OutOfMemoryError for long synthesis tasks. Since the primary purpose of streamCall is to provide audio chunks in real-time via the callback, accumulation should be optional. Consider adding a configuration parameter to enable or disable audio accumulation.
No description provided.