Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.alibaba.dashscope.exception.InputRequiredException;
import com.alibaba.dashscope.exception.NoApiKeyException;
import com.alibaba.dashscope.protocol.*;
import com.alibaba.dashscope.utils.JsonUtils;
import com.google.gson.JsonObject;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Emitter;
Expand All @@ -19,7 +20,6 @@
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.util.LinkedList;
import java.util.Objects;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
Expand All @@ -44,7 +44,7 @@ public final class SpeechSynthesizer {
SynchronizeFullDuplexApi<SpeechSynthesisParam> duplexApi;

private ApiServiceOption serviceOption;
private Emitter<String> textEmitter;
private Emitter<TextStreamItem> textEmitter;
private ResultCallback<SpeechSynthesisResult> callback;
private SpeechSynthesisState state = SpeechSynthesisState.IDLE;

Expand Down Expand Up @@ -235,10 +235,11 @@ public Flowable<SpeechSynthesisResult> streamingCallAsFlowable(Flowable<String>
startStreamTimeStamp = System.currentTimeMillis();
recvAudioLength = 0;
preRequestId = UUID.randomUUID().toString();
Flowable<TextStreamItem> inputStream = textStream.map(text -> new TextStreamItem(text));
return duplexApi
.duplexCall(
StreamInputTtsParamWithStream.fromStreamInputTtsParam(
this.parameters, textStream, preRequestId, false))
this.parameters, inputStream, preRequestId, false))
.filter(item -> item.getEvent() != WebSocketEventType.TASK_STARTED.getValue())
.map(SpeechSynthesisResult::fromDashScopeResult)
.filter(item -> !canceled.get())
Expand Down Expand Up @@ -284,7 +285,7 @@ public Flowable<SpeechSynthesisResult> callAsFlowable(String text)
emitter -> {
new Thread(
() -> {
emitter.onNext(text);
emitter.onNext(new TextStreamItem(text));
emitter.onComplete();
})
.start();
Expand Down Expand Up @@ -336,7 +337,7 @@ private void startStream(boolean enableSsml) {
// timestamps.clear();
WritableByteChannel channel = Channels.newChannel(outputStream);

Flowable<String> textFrames =
Flowable<TextStreamItem> textFrames =
Flowable.create(
emitter -> {
synchronized (SpeechSynthesizer.this) {
Expand All @@ -346,7 +347,7 @@ private void startStream(boolean enableSsml) {
emitter.onComplete();
return;
} else {
emitter.onNext(buffer.text);
emitter.onNext(buffer.item);
}
}
cmdBuffer.clear();
Expand Down Expand Up @@ -496,7 +497,7 @@ public void onError(Exception e) {
* @param text utf-8 encoded text
*/
private void submitText(String text) {
if (Objects.equals(text, "")) {
if (text == null || text.isEmpty()) {
throw new ApiException(new InputRequiredException("Parameter invalid: text is null"));
}
synchronized (this) {
Expand All @@ -507,10 +508,10 @@ private void submitText(String text) {
}
if (textEmitter == null) {
log.debug("submitText to new emitter: " + text);
cmdBuffer.add(AsyncCmdBuffer.builder().text(text).build());
cmdBuffer.add(AsyncCmdBuffer.builder().item(new TextStreamItem(text)).build());
} else {
log.debug("submitText to emitter: " + text);
textEmitter.onNext(text);
textEmitter.onNext(new TextStreamItem(text));
}
}
}
Expand Down Expand Up @@ -660,6 +661,27 @@ public void streamingCall(String text) {
}
}

public void streamingFlush() {
streamingFlush(null);
}

public void streamingFlush(JsonObject params) {
synchronized (this) {
if (state != SpeechSynthesisState.TTS_STARTED) {
throw new ApiException(
new InputRequiredException(
Comment thread
songguocola marked this conversation as resolved.
"State invalid: expect stream input tts state is started but " + state.getValue()));
}
if (textEmitter == null) {
log.debug("submitFlush to new emitter");
cmdBuffer.add(AsyncCmdBuffer.builder().item(new TextStreamItem(true, params)).build());
} else {
log.debug("submitFlush to emitter");
textEmitter.onNext(new TextStreamItem(true, params));
}
}
}

/**
* Speech synthesis If a callback is set, the audio will be returned in real-time through the
* on_event interface Otherwise, this function blocks until all audio is received and then returns
Expand Down Expand Up @@ -750,17 +772,39 @@ public ByteBuffer call(String text) {
@SuperBuilder
private static class AsyncCmdBuffer {
@Builder.Default private boolean isStop = false;
private String text;
private TextStreamItem item;
}

private static class TextStreamItem {
public TextStreamItem(String text) {
this.text = text;
this.flush = false;
}

public TextStreamItem(boolean flush) {
this.text = null;
this.flush = flush;
}

public TextStreamItem(boolean flush, JsonObject params) {
this.text = null;
this.flush = flush;
this.params = params;
}

public String text;
public boolean flush;
public JsonObject params;
}
Comment thread
songguocola marked this conversation as resolved.

@SuperBuilder
private static class StreamInputTtsParamWithStream extends SpeechSynthesisParam {

@NonNull private Flowable<String> textStream;
@NonNull private Flowable<TextStreamItem> textStream;

public static StreamInputTtsParamWithStream fromStreamInputTtsParam(
SpeechSynthesisParam param,
Flowable<String> textStream,
Flowable<TextStreamItem> textStream,
String preRequestId,
boolean enableSsml) {
return StreamInputTtsParamWithStream.builder()
Expand All @@ -780,10 +824,19 @@ public static StreamInputTtsParamWithStream fromStreamInputTtsParam(
public Flowable<Object> getStreamingData() {
return textStream
.map(
text -> {
JsonObject jsonObject = new JsonObject();
jsonObject.addProperty("text", text);
return jsonObject;
item -> {
if (!item.flush) {
JsonObject jsonObject = new JsonObject();
jsonObject.addProperty("text", item.text);
return jsonObject;
} else {
JsonObject jsonObject = new JsonObject();
jsonObject.addProperty("flush", true);
if (item.params != null) {
JsonUtils.merge(jsonObject, item.params);
}
return jsonObject;
}
})
.cast(Object.class);
}
Expand Down
Loading