diff --git a/src/main/java/com/alibaba/dashscope/audio/ttsv2/SpeechSynthesizer.java b/src/main/java/com/alibaba/dashscope/audio/ttsv2/SpeechSynthesizer.java index 8b382ac..abf90cc 100644 --- a/src/main/java/com/alibaba/dashscope/audio/ttsv2/SpeechSynthesizer.java +++ b/src/main/java/com/alibaba/dashscope/audio/ttsv2/SpeechSynthesizer.java @@ -9,6 +9,7 @@ import com.alibaba.dashscope.exception.InputRequiredException; import com.alibaba.dashscope.exception.NoApiKeyException; import com.alibaba.dashscope.protocol.*; +import com.alibaba.dashscope.utils.JsonUtils; import com.google.gson.JsonObject; import io.reactivex.BackpressureStrategy; import io.reactivex.Emitter; @@ -44,7 +45,7 @@ public final class SpeechSynthesizer { SynchronizeFullDuplexApi duplexApi; private ApiServiceOption serviceOption; - private Emitter textEmitter; + private Emitter textEmitter; private ResultCallback callback; private SpeechSynthesisState state = SpeechSynthesisState.IDLE; @@ -235,10 +236,11 @@ public Flowable streamingCallAsFlowable(Flowable startStreamTimeStamp = System.currentTimeMillis(); recvAudioLength = 0; preRequestId = UUID.randomUUID().toString(); + Flowable inputStream = textStream.map(text -> new TextStreamItem(text)); return duplexApi .duplexCall( StreamInputTtsParamWithStream.fromStreamInputTtsParam( - this.parameters, textStream, preRequestId, false)) + this.parameters, inputStream, preRequestId, false)) .filter(item -> item.getEvent() != WebSocketEventType.TASK_STARTED.getValue()) .map(SpeechSynthesisResult::fromDashScopeResult) .filter(item -> !canceled.get()) @@ -284,7 +286,7 @@ public Flowable callAsFlowable(String text) emitter -> { new Thread( () -> { - emitter.onNext(text); + emitter.onNext(new TextStreamItem(text)); emitter.onComplete(); }) .start(); @@ -336,7 +338,7 @@ private void startStream(boolean enableSsml) { // timestamps.clear(); WritableByteChannel channel = Channels.newChannel(outputStream); - Flowable textFrames = + Flowable textFrames = Flowable.create( emitter -> { synchronized (SpeechSynthesizer.this) { @@ -346,7 +348,7 @@ private void startStream(boolean enableSsml) { emitter.onComplete(); return; } else { - emitter.onNext(buffer.text); + emitter.onNext(buffer.item); } } cmdBuffer.clear(); @@ -507,10 +509,10 @@ private void submitText(String text) { } if (textEmitter == null) { log.debug("submitText to new emitter: " + text); - cmdBuffer.add(AsyncCmdBuffer.builder().text(text).build()); + cmdBuffer.add(AsyncCmdBuffer.builder().item(new TextStreamItem(text)).build()); } else { log.debug("submitText to emitter: " + text); - textEmitter.onNext(text); + textEmitter.onNext(new TextStreamItem(text)); } } } @@ -660,6 +662,40 @@ public void streamingCall(String text) { } } + public void streamingFlush() { + synchronized (this) { + if (state != SpeechSynthesisState.TTS_STARTED) { + throw new ApiException( + new InputRequiredException( + "State invalid: expect stream input tts state is started but " + state.getValue())); + } + if (textEmitter == null) { + log.debug("submitFlush to new emitter"); + cmdBuffer.add(AsyncCmdBuffer.builder().item(new TextStreamItem(true)).build()); + } else { + log.debug("submitFlush to emitter"); + textEmitter.onNext(new TextStreamItem(true)); + } + } + } + + public void streamingFlush(JsonObject params) { + synchronized (this) { + if (state != SpeechSynthesisState.TTS_STARTED) { + throw new ApiException( + new InputRequiredException( + "State invalid: expect stream input tts state is started but " + state.getValue())); + } + if (textEmitter == null) { + log.debug("submitFlush to new emitter"); + cmdBuffer.add(AsyncCmdBuffer.builder().item(new TextStreamItem(true, params)).build()); + } else { + log.debug("submitFlush to emitter"); + textEmitter.onNext(new TextStreamItem(true, params)); + } + } + } + /** * Speech synthesis If a callback is set, the audio will be returned in real-time through the * on_event interface Otherwise, this function blocks until all audio is received and then returns @@ -750,17 +786,39 @@ public ByteBuffer call(String text) { @SuperBuilder private static class AsyncCmdBuffer { @Builder.Default private boolean isStop = false; - private String text; + private TextStreamItem item; + } + + private class TextStreamItem { + public TextStreamItem(String text) { + this.text = text; + this.flush = false; + } + + public TextStreamItem(boolean flush) { + this.text = null; + this.flush = flush; + } + + public TextStreamItem(boolean flush, JsonObject params) { + this.text = null; + this.flush = flush; + this.params = params; + } + + public String text; + public boolean flush; + public JsonObject params; } @SuperBuilder private static class StreamInputTtsParamWithStream extends SpeechSynthesisParam { - @NonNull private Flowable textStream; + @NonNull private Flowable textStream; public static StreamInputTtsParamWithStream fromStreamInputTtsParam( SpeechSynthesisParam param, - Flowable textStream, + Flowable textStream, String preRequestId, boolean enableSsml) { return StreamInputTtsParamWithStream.builder() @@ -780,10 +838,24 @@ public static StreamInputTtsParamWithStream fromStreamInputTtsParam( public Flowable getStreamingData() { return textStream .map( - text -> { - JsonObject jsonObject = new JsonObject(); - jsonObject.addProperty("text", text); - return jsonObject; + item -> { + if (!item.flush) { + JsonObject jsonObject = new JsonObject(); + jsonObject.addProperty("text", item.text); + return jsonObject; + } else { + JsonObject jsonObject = new JsonObject(); + jsonObject.addProperty("flush", true); + if (item.params != null && !item.params.isJsonNull()) { + for (String key : item.params.keySet()) { + Object value = item.params.get(key); + if (key != null && value != null) { + jsonObject.add(key, JsonUtils.toJsonElement(value)); + } + } + } + return jsonObject; + } }) .cast(Object.class); }