Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 86 additions & 14 deletions src/main/java/com/alibaba/dashscope/audio/ttsv2/SpeechSynthesizer.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.alibaba.dashscope.exception.InputRequiredException;
import com.alibaba.dashscope.exception.NoApiKeyException;
import com.alibaba.dashscope.protocol.*;
import com.alibaba.dashscope.utils.JsonUtils;
import com.google.gson.JsonObject;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Emitter;
Expand Down Expand Up @@ -44,7 +45,7 @@ public final class SpeechSynthesizer {
SynchronizeFullDuplexApi<SpeechSynthesisParam> duplexApi;

private ApiServiceOption serviceOption;
private Emitter<String> textEmitter;
private Emitter<TextStreamItem> textEmitter;
private ResultCallback<SpeechSynthesisResult> callback;
private SpeechSynthesisState state = SpeechSynthesisState.IDLE;

Expand Down Expand Up @@ -235,10 +236,11 @@ public Flowable<SpeechSynthesisResult> streamingCallAsFlowable(Flowable<String>
startStreamTimeStamp = System.currentTimeMillis();
recvAudioLength = 0;
preRequestId = UUID.randomUUID().toString();
Flowable<TextStreamItem> inputStream = textStream.map(text -> new TextStreamItem(text));
return duplexApi
.duplexCall(
StreamInputTtsParamWithStream.fromStreamInputTtsParam(
this.parameters, textStream, preRequestId, false))
this.parameters, inputStream, preRequestId, false))
.filter(item -> item.getEvent() != WebSocketEventType.TASK_STARTED.getValue())
.map(SpeechSynthesisResult::fromDashScopeResult)
.filter(item -> !canceled.get())
Expand Down Expand Up @@ -284,7 +286,7 @@ public Flowable<SpeechSynthesisResult> callAsFlowable(String text)
emitter -> {
new Thread(
() -> {
emitter.onNext(text);
emitter.onNext(new TextStreamItem(text));
emitter.onComplete();
})
.start();
Expand Down Expand Up @@ -336,7 +338,7 @@ private void startStream(boolean enableSsml) {
// timestamps.clear();
WritableByteChannel channel = Channels.newChannel(outputStream);

Flowable<String> textFrames =
Flowable<TextStreamItem> textFrames =
Flowable.create(
emitter -> {
synchronized (SpeechSynthesizer.this) {
Expand All @@ -346,7 +348,7 @@ private void startStream(boolean enableSsml) {
emitter.onComplete();
return;
} else {
emitter.onNext(buffer.text);
emitter.onNext(buffer.item);
}
}
cmdBuffer.clear();
Expand Down Expand Up @@ -507,10 +509,10 @@ private void submitText(String text) {
}
if (textEmitter == null) {
log.debug("submitText to new emitter: " + text);
cmdBuffer.add(AsyncCmdBuffer.builder().text(text).build());
cmdBuffer.add(AsyncCmdBuffer.builder().item(new TextStreamItem(text)).build());
} else {
log.debug("submitText to emitter: " + text);
textEmitter.onNext(text);
textEmitter.onNext(new TextStreamItem(text));
}
}
}
Expand Down Expand Up @@ -660,6 +662,40 @@ public void streamingCall(String text) {
}
}

public void streamingFlush() {
synchronized (this) {
if (state != SpeechSynthesisState.TTS_STARTED) {
throw new ApiException(
new InputRequiredException(
"State invalid: expect stream input tts state is started but " + state.getValue()));
}
if (textEmitter == null) {
log.debug("submitFlush to new emitter");
cmdBuffer.add(AsyncCmdBuffer.builder().item(new TextStreamItem(true)).build());
} else {
log.debug("submitFlush to emitter");
textEmitter.onNext(new TextStreamItem(true));
}
}
}
Comment on lines +665 to +680
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The streamingFlush() method contains logic identical to streamingFlush(JsonObject params). It should be refactored to call the parameterized version with null to reduce code duplication and improve maintainability.

  public void streamingFlush() {
    streamingFlush(null);
  }


public void streamingFlush(JsonObject params) {
synchronized (this) {
if (state != SpeechSynthesisState.TTS_STARTED) {
throw new ApiException(
new InputRequiredException(
"State invalid: expect stream input tts state is started but " + state.getValue()));
}
if (textEmitter == null) {
log.debug("submitFlush to new emitter");
cmdBuffer.add(AsyncCmdBuffer.builder().item(new TextStreamItem(true, params)).build());
} else {
log.debug("submitFlush to emitter");
textEmitter.onNext(new TextStreamItem(true, params));
}
}
}

/**
* Speech synthesis If a callback is set, the audio will be returned in real-time through the
* on_event interface Otherwise, this function blocks until all audio is received and then returns
Expand Down Expand Up @@ -750,17 +786,39 @@ public ByteBuffer call(String text) {
@SuperBuilder
private static class AsyncCmdBuffer {
@Builder.Default private boolean isStop = false;
private String text;
private TextStreamItem item;
}

private class TextStreamItem {
public TextStreamItem(String text) {
this.text = text;
this.flush = false;
}

public TextStreamItem(boolean flush) {
this.text = null;
this.flush = flush;
}

public TextStreamItem(boolean flush, JsonObject params) {
this.text = null;
this.flush = flush;
this.params = params;
}

public String text;
public boolean flush;
public JsonObject params;
}
Comment on lines +792 to 812
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

TextStreamItem should be declared as a static inner class because it does not require access to the outer SpeechSynthesizer instance. Additionally, making its fields final ensures immutability and thread safety, which is a best practice when objects are passed through reactive streams.

  private static class TextStreamItem {
    public final String text;
    public final boolean flush;
    public final JsonObject params;

    public TextStreamItem(String text) {
      this.text = text;
      this.flush = false;
      this.params = null;
    }

    public TextStreamItem(boolean flush) {
      this.text = null;
      this.flush = flush;
      this.params = null;
    }

    public TextStreamItem(boolean flush, JsonObject params) {
      this.text = null;
      this.flush = flush;
      this.params = params;
    }
  }


@SuperBuilder
private static class StreamInputTtsParamWithStream extends SpeechSynthesisParam {

@NonNull private Flowable<String> textStream;
@NonNull private Flowable<TextStreamItem> textStream;

public static StreamInputTtsParamWithStream fromStreamInputTtsParam(
SpeechSynthesisParam param,
Flowable<String> textStream,
Flowable<TextStreamItem> textStream,
String preRequestId,
boolean enableSsml) {
return StreamInputTtsParamWithStream.builder()
Expand All @@ -780,10 +838,24 @@ public static StreamInputTtsParamWithStream fromStreamInputTtsParam(
public Flowable<Object> getStreamingData() {
return textStream
.map(
text -> {
JsonObject jsonObject = new JsonObject();
jsonObject.addProperty("text", text);
return jsonObject;
item -> {
if (!item.flush) {
JsonObject jsonObject = new JsonObject();
jsonObject.addProperty("text", item.text);
return jsonObject;
} else {
JsonObject jsonObject = new JsonObject();
jsonObject.addProperty("flush", true);
if (item.params != null && !item.params.isJsonNull()) {
for (String key : item.params.keySet()) {
Object value = item.params.get(key);
if (key != null && value != null) {
jsonObject.add(key, JsonUtils.toJsonElement(value));
}
}
}
Comment on lines +849 to +856
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The manual iteration and copying of parameters can be simplified by using the existing JsonUtils.merge utility. This is more concise, avoids redundant null checks on keys, and prevents unnecessary re-wrapping of JsonElement values via JsonUtils.toJsonElement.

                  if (item.params != null) {
                    JsonUtils.merge(jsonObject, item.params);
                  }

return jsonObject;
}
})
.cast(Object.class);
}
Expand Down
Loading