Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.alibaba.dashscope.exception.InputRequiredException;
import com.alibaba.dashscope.exception.NoApiKeyException;
import com.alibaba.dashscope.protocol.*;
import com.alibaba.dashscope.utils.JsonUtils;
import com.google.gson.JsonObject;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Emitter;
Expand Down Expand Up @@ -44,7 +45,7 @@ public final class SpeechSynthesizer {
SynchronizeFullDuplexApi<SpeechSynthesisParam> duplexApi;

private ApiServiceOption serviceOption;
private Emitter<String> textEmitter;
private Emitter<TextStreamItem> textEmitter;
private ResultCallback<SpeechSynthesisResult> callback;
private SpeechSynthesisState state = SpeechSynthesisState.IDLE;

Expand Down Expand Up @@ -235,10 +236,11 @@ public Flowable<SpeechSynthesisResult> streamingCallAsFlowable(Flowable<String>
startStreamTimeStamp = System.currentTimeMillis();
recvAudioLength = 0;
preRequestId = UUID.randomUUID().toString();
Flowable<TextStreamItem> inputStream = textStream.map(text -> new TextStreamItem(text));
return duplexApi
.duplexCall(
StreamInputTtsParamWithStream.fromStreamInputTtsParam(
this.parameters, textStream, preRequestId, false))
this.parameters, inputStream, preRequestId, false))
.filter(item -> item.getEvent() != WebSocketEventType.TASK_STARTED.getValue())
.map(SpeechSynthesisResult::fromDashScopeResult)
.filter(item -> !canceled.get())
Expand Down Expand Up @@ -284,7 +286,7 @@ public Flowable<SpeechSynthesisResult> callAsFlowable(String text)
emitter -> {
new Thread(
() -> {
emitter.onNext(text);
emitter.onNext(new TextStreamItem(text));
emitter.onComplete();
})
.start();
Expand Down Expand Up @@ -336,7 +338,7 @@ private void startStream(boolean enableSsml) {
// timestamps.clear();
WritableByteChannel channel = Channels.newChannel(outputStream);

Flowable<String> textFrames =
Flowable<TextStreamItem> textFrames =
Flowable.create(
emitter -> {
synchronized (SpeechSynthesizer.this) {
Expand All @@ -346,7 +348,7 @@ private void startStream(boolean enableSsml) {
emitter.onComplete();
return;
} else {
emitter.onNext(buffer.text);
emitter.onNext(buffer.item);
}
}
cmdBuffer.clear();
Expand Down Expand Up @@ -496,7 +498,7 @@ public void onError(Exception e) {
* @param text utf-8 encoded text
*/
private void submitText(String text) {
if (Objects.equals(text, "")) {
if (text == null || text.isEmpty()) {
throw new ApiException(new InputRequiredException("Parameter invalid: text is null"));
}
synchronized (this) {
Expand All @@ -507,10 +509,10 @@ private void submitText(String text) {
}
if (textEmitter == null) {
log.debug("submitText to new emitter: " + text);
cmdBuffer.add(AsyncCmdBuffer.builder().text(text).build());
cmdBuffer.add(AsyncCmdBuffer.builder().item(new TextStreamItem(text)).build());
} else {
log.debug("submitText to emitter: " + text);
textEmitter.onNext(text);
textEmitter.onNext(new TextStreamItem(text));
}
}
}
Expand Down Expand Up @@ -660,6 +662,27 @@ public void streamingCall(String text) {
}
}

public void streamingFlush() {
streamingFlush(null);
}

public void streamingFlush(JsonObject params) {
synchronized (this) {
if (state != SpeechSynthesisState.TTS_STARTED) {
throw new ApiException(
new InputRequiredException(
"State invalid: expect stream input tts state is started but " + state.getValue()));
}
if (textEmitter == null) {
log.debug("submitFlush to new emitter");
cmdBuffer.add(AsyncCmdBuffer.builder().item(new TextStreamItem(true, params)).build());
} else {
log.debug("submitFlush to emitter");
textEmitter.onNext(new TextStreamItem(true, params));
}
}
}

/**
* Speech synthesis If a callback is set, the audio will be returned in real-time through the
* on_event interface Otherwise, this function blocks until all audio is received and then returns
Expand Down Expand Up @@ -750,17 +773,39 @@ public ByteBuffer call(String text) {
@SuperBuilder
private static class AsyncCmdBuffer {
@Builder.Default private boolean isStop = false;
private String text;
private TextStreamItem item;
}

private static class TextStreamItem {
public TextStreamItem(String text) {
this.text = text;
this.flush = false;
}

public TextStreamItem(boolean flush) {
this.text = null;
this.flush = flush;
}

public TextStreamItem(boolean flush, JsonObject params) {
this.text = null;
this.flush = flush;
this.params = params;
}
Comment on lines +790 to +794
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The JsonObject passed as params is a mutable object. Since the serialization of this item occurs asynchronously in the RxJava stream (within the getStreamingData method), any modifications made to the params object by the caller after calling streamingFlush but before the message is actually processed and sent could lead to race conditions or inconsistent data. It is safer to store a deep copy of the parameters to ensure the state is captured at the moment of the call.

Suggested change
public TextStreamItem(boolean flush, JsonObject params) {
this.text = null;
this.flush = flush;
this.params = params;
}
public TextStreamItem(boolean flush, JsonObject params) {
this.text = null;
this.flush = flush;
this.params = params != null ? params.deepCopy() : null;
}


public String text;
public boolean flush;
public JsonObject params;
}

@SuperBuilder
private static class StreamInputTtsParamWithStream extends SpeechSynthesisParam {

@NonNull private Flowable<String> textStream;
@NonNull private Flowable<TextStreamItem> textStream;

public static StreamInputTtsParamWithStream fromStreamInputTtsParam(
SpeechSynthesisParam param,
Flowable<String> textStream,
Flowable<TextStreamItem> textStream,
String preRequestId,
boolean enableSsml) {
return StreamInputTtsParamWithStream.builder()
Expand All @@ -780,10 +825,19 @@ public static StreamInputTtsParamWithStream fromStreamInputTtsParam(
public Flowable<Object> getStreamingData() {
return textStream
.map(
text -> {
JsonObject jsonObject = new JsonObject();
jsonObject.addProperty("text", text);
return jsonObject;
item -> {
if (!item.flush) {
JsonObject jsonObject = new JsonObject();
jsonObject.addProperty("text", item.text);
return jsonObject;
} else {
JsonObject jsonObject = new JsonObject();
jsonObject.addProperty("flush", true);
if (item.params != null) {
JsonUtils.merge(jsonObject, item.params);
}
return jsonObject;
}
})
.cast(Object.class);
}
Expand Down
Loading