Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,12 @@
import java.io.ByteArrayOutputStream;
import java.net.MalformedURLException;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
import java.util.regex.Pattern;

/**
* Utility entry point that adapts an Azure {@link com.azure.core.http.HttpClient} so it can be consumed by
Expand Down Expand Up @@ -87,9 +89,9 @@ public HttpResponse execute(HttpRequest request, RequestOptions requestOptions)
Objects.requireNonNull(requestOptions, "requestOptions");

try {
com.azure.core.http.HttpRequest azureRequest = buildAzureRequest(request);
return new AzureHttpResponseAdapter(
this.httpPipeline.sendSync(azureRequest, buildRequestContext(requestOptions)));
PreparedRequest prepared = buildAzureRequest(request);
return new AzureHttpResponseAdapter(this.httpPipeline.sendSync(prepared.azureRequest,
buildRequestContext(requestOptions, prepared.isStreaming)));
} catch (MalformedURLException exception) {
throw new OpenAIException("Invalid URL in request: " + exception.getMessage(),
LOGGER.logThrowableAsError(exception));
Expand All @@ -107,7 +109,8 @@ public CompletableFuture<HttpResponse> executeAsync(HttpRequest request, Request
Objects.requireNonNull(requestOptions, "requestOptions");

return Mono.fromCallable(() -> buildAzureRequest(request))
.flatMap(azureRequest -> this.httpPipeline.send(azureRequest, buildRequestContext(requestOptions)))
.flatMap(prepared -> this.httpPipeline.send(prepared.azureRequest,
buildRequestContext(requestOptions, prepared.isStreaming)))
.map(response -> (HttpResponse) new AzureHttpResponseAdapter(response))
.onErrorMap(HttpClientWrapper::mapAzureExceptionToOpenAI)
.toFuture();
Expand Down Expand Up @@ -188,18 +191,50 @@ private static Headers toOpenAIHeaders(HttpHeaders azureHeaders) {
}

/**
* Converts the OpenAI request metadata and body into an Azure {@link com.azure.core.http.HttpRequest}.
* Pattern that matches a top-level {@code "stream": true} JSON field, allowing for
* optional whitespace around the colon. This is used to detect streaming requests
* so the Azure pipeline does not eagerly buffer the SSE response body.
*/
private static com.azure.core.http.HttpRequest buildAzureRequest(HttpRequest request)
throws MalformedURLException {
private static final Pattern STREAM_TRUE_PATTERN = Pattern.compile("\"stream\"\\s*:\\s*true");

/**
* Holds the converted Azure request together with a flag indicating whether the
* original OpenAI request is a streaming request.
*/
static final class PreparedRequest {
final com.azure.core.http.HttpRequest azureRequest;
final boolean isStreaming;

PreparedRequest(com.azure.core.http.HttpRequest azureRequest, boolean isStreaming) {
this.azureRequest = azureRequest;
this.isStreaming = isStreaming;
}
}

/**
* Converts the OpenAI request metadata and body into an Azure {@link com.azure.core.http.HttpRequest}
* and determines whether the request is a streaming request.
*/
private static PreparedRequest buildAzureRequest(HttpRequest request) throws MalformedURLException {
HttpRequestBody requestBody = request.body();
String contentType = requestBody != null ? requestBody.contentType() : null;
BinaryData bodyData = null;
boolean isStreaming = false;

if (requestBody != null && requestBody.contentLength() != 0) {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
requestBody.writeTo(outputStream);
bodyData = BinaryData.fromBytes(outputStream.toByteArray());
byte[] bodyBytes = outputStream.toByteArray();
bodyData = BinaryData.fromBytes(bodyBytes);

// Detect streaming requests by checking for "stream": true in the JSON body.
// When the OpenAI SDK calls createStreaming(), it serializes stream=true into the
// request body. Streaming responses use SSE (Server-Sent Events) and must NOT be
// eagerly buffered by the Azure HTTP pipeline.
if (contentType != null && contentType.contains("json") && bodyBytes.length > 0) {
String bodyStr = new String(bodyBytes, StandardCharsets.UTF_8);
isStreaming = STREAM_TRUE_PATTERN.matcher(bodyStr).find();
}
}

HttpHeaders headers = toAzureHeaders(request.headers());
Expand All @@ -214,7 +249,7 @@ private static com.azure.core.http.HttpRequest buildAzureRequest(HttpRequest req
azureRequest.setBody(bodyData);
}

return azureRequest;
return new PreparedRequest(azureRequest, isStreaming);
}

/**
Expand All @@ -236,11 +271,18 @@ private static HttpHeaders toAzureHeaders(Headers sourceHeaders) {

/**
* Builds the request context from the given request options.
*
* <p>When {@code isStreaming} is {@code true}, the {@code azure-eagerly-read-response}
* context flag is set to {@code false} so the Azure HTTP pipeline returns a live
* streaming body instead of buffering the entire response into memory. This is
* required for SSE (Server-Sent Events) streaming to deliver events incrementally.</p>
*
* @param requestOptions OpenAI SDK request options
* @param isStreaming whether the request is a streaming request
* @return Azure request {@link Context}
*/
private static Context buildRequestContext(RequestOptions requestOptions) {
Context context = new Context("azure-eagerly-read-response", true);
private static Context buildRequestContext(RequestOptions requestOptions, boolean isStreaming) {
Context context = new Context("azure-eagerly-read-response", !isStreaming);
Timeout timeout = requestOptions.getTimeout();
// we use "read" as it's the closest thing to the "response timeout"
if (timeout != null && !timeout.read().isZero() && !timeout.read().isNegative()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.function.Function;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -160,6 +161,187 @@ void executeAsyncPropagatesHttpClientFailures() {
assertEquals("Network error", cause.getMessage());
}

// ========================================================================
// Streaming detection tests — verifies fix for issue #48726
// ========================================================================

@Test
void nonStreamingRequestSetsEagerlyReadToTrue() {
ContextCapturingHttpClient capturingClient = new ContextCapturingHttpClient();
com.openai.core.http.HttpClient openAiClient
= HttpClientHelper.mapToOpenAIHttpClient(new HttpPipelineBuilder().httpClient(capturingClient).build());

// Non-streaming JSON body (no "stream":true)
String body = "{\"model\":\"gpt-4o\",\"input\":\"Hello\"}";
com.openai.core.http.HttpRequest openAiRequest = com.openai.core.http.HttpRequest.builder()
.method(com.openai.core.http.HttpMethod.POST)
.baseUrl("https://example.com")
.addPathSegment("responses")
.body(new TestHttpRequestBody(body, "application/json"))
.build();

try (com.openai.core.http.HttpResponse response = openAiClient.execute(openAiRequest)) {
assertEquals(200, response.statusCode());
}

Context capturedContext = capturingClient.getLastContext();
assertNotNull(capturedContext, "Context should have been captured");
Object eagerlyRead = capturedContext.getData("azure-eagerly-read-response").orElse(null);
assertTrue((Boolean) eagerlyRead, "Non-streaming requests should have azure-eagerly-read-response=true");
}

@Test
void streamingRequestSetsEagerlyReadToFalse() {
ContextCapturingHttpClient capturingClient = new ContextCapturingHttpClient();
com.openai.core.http.HttpClient openAiClient
= HttpClientHelper.mapToOpenAIHttpClient(new HttpPipelineBuilder().httpClient(capturingClient).build());

// Streaming JSON body (contains "stream":true)
String body = "{\"model\":\"gpt-4o\",\"input\":\"Hello\",\"stream\":true}";
com.openai.core.http.HttpRequest openAiRequest = com.openai.core.http.HttpRequest.builder()
.method(com.openai.core.http.HttpMethod.POST)
.baseUrl("https://example.com")
.addPathSegment("responses")
.body(new TestHttpRequestBody(body, "application/json"))
.build();

try (com.openai.core.http.HttpResponse response = openAiClient.execute(openAiRequest)) {
assertEquals(200, response.statusCode());
}

Context capturedContext = capturingClient.getLastContext();
assertNotNull(capturedContext, "Context should have been captured");
Object eagerlyRead = capturedContext.getData("azure-eagerly-read-response").orElse(null);
assertFalse((Boolean) eagerlyRead, "Streaming requests should have azure-eagerly-read-response=false");
}

@Test
void streamingRequestWithWhitespaceSetsEagerlyReadToFalse() {
ContextCapturingHttpClient capturingClient = new ContextCapturingHttpClient();
com.openai.core.http.HttpClient openAiClient
= HttpClientHelper.mapToOpenAIHttpClient(new HttpPipelineBuilder().httpClient(capturingClient).build());

// Streaming JSON body with spaces around the colon
String body = "{\"model\":\"gpt-4o\", \"stream\" : true, \"input\":\"Hello\"}";
com.openai.core.http.HttpRequest openAiRequest = com.openai.core.http.HttpRequest.builder()
.method(com.openai.core.http.HttpMethod.POST)
.baseUrl("https://example.com")
.addPathSegment("responses")
.body(new TestHttpRequestBody(body, "application/json"))
.build();

try (com.openai.core.http.HttpResponse response = openAiClient.execute(openAiRequest)) {
assertEquals(200, response.statusCode());
}

Context capturedContext = capturingClient.getLastContext();
assertNotNull(capturedContext, "Context should have been captured");
Object eagerlyRead = capturedContext.getData("azure-eagerly-read-response").orElse(null);
assertFalse((Boolean) eagerlyRead,
"Streaming requests with whitespace around colon should have azure-eagerly-read-response=false");
}

@Test
void streamingAsyncRequestSetsEagerlyReadToFalse() {
ContextCapturingHttpClient capturingClient = new ContextCapturingHttpClient();
com.openai.core.http.HttpClient openAiClient
= HttpClientHelper.mapToOpenAIHttpClient(new HttpPipelineBuilder().httpClient(capturingClient).build());

// Streaming JSON body
String body = "{\"model\":\"gpt-4o\",\"input\":\"Hello\",\"stream\":true}";
com.openai.core.http.HttpRequest openAiRequest = com.openai.core.http.HttpRequest.builder()
.method(com.openai.core.http.HttpMethod.POST)
.baseUrl("https://example.com")
.addPathSegment("responses")
.body(new TestHttpRequestBody(body, "application/json"))
.build();

CompletableFuture<com.openai.core.http.HttpResponse> future = openAiClient.executeAsync(openAiRequest);
try (com.openai.core.http.HttpResponse response = future.join()) {
assertEquals(200, response.statusCode());
}

Context capturedContext = capturingClient.getLastContext();
assertNotNull(capturedContext, "Context should have been captured");
Object eagerlyRead = capturedContext.getData("azure-eagerly-read-response").orElse(null);
assertFalse((Boolean) eagerlyRead, "Async streaming requests should have azure-eagerly-read-response=false");
}

@Test
void nonJsonBodySetsEagerlyReadToTrue() {
ContextCapturingHttpClient capturingClient = new ContextCapturingHttpClient();
com.openai.core.http.HttpClient openAiClient
= HttpClientHelper.mapToOpenAIHttpClient(new HttpPipelineBuilder().httpClient(capturingClient).build());

// Non-JSON body that happens to contain "stream":true text
com.openai.core.http.HttpRequest openAiRequest = com.openai.core.http.HttpRequest.builder()
.method(com.openai.core.http.HttpMethod.POST)
.baseUrl("https://example.com")
.body(new TestHttpRequestBody("stream\":true", "text/plain"))
.build();

try (com.openai.core.http.HttpResponse response = openAiClient.execute(openAiRequest)) {
assertEquals(200, response.statusCode());
}

Context capturedContext = capturingClient.getLastContext();
assertNotNull(capturedContext, "Context should have been captured");
Object eagerlyRead = capturedContext.getData("azure-eagerly-read-response").orElse(null);
assertTrue((Boolean) eagerlyRead, "Non-JSON bodies should always have azure-eagerly-read-response=true");
}

@Test
void streamFalseInBodySetsEagerlyReadToTrue() {
ContextCapturingHttpClient capturingClient = new ContextCapturingHttpClient();
com.openai.core.http.HttpClient openAiClient
= HttpClientHelper.mapToOpenAIHttpClient(new HttpPipelineBuilder().httpClient(capturingClient).build());

// JSON body with "stream":false
String body = "{\"model\":\"gpt-4o\",\"input\":\"Hello\",\"stream\":false}";
com.openai.core.http.HttpRequest openAiRequest = com.openai.core.http.HttpRequest.builder()
.method(com.openai.core.http.HttpMethod.POST)
.baseUrl("https://example.com")
.addPathSegment("responses")
.body(new TestHttpRequestBody(body, "application/json"))
.build();

try (com.openai.core.http.HttpResponse response = openAiClient.execute(openAiRequest)) {
assertEquals(200, response.statusCode());
}

Context capturedContext = capturingClient.getLastContext();
assertNotNull(capturedContext, "Context should have been captured");
Object eagerlyRead = capturedContext.getData("azure-eagerly-read-response").orElse(null);
assertTrue((Boolean) eagerlyRead, "Requests with stream=false should have azure-eagerly-read-response=true");
}

@Test
void noBodySetsEagerlyReadToTrue() {
ContextCapturingHttpClient capturingClient = new ContextCapturingHttpClient();
com.openai.core.http.HttpClient openAiClient
= HttpClientHelper.mapToOpenAIHttpClient(new HttpPipelineBuilder().httpClient(capturingClient).build());

// GET request with no body
com.openai.core.http.HttpRequest openAiRequest = com.openai.core.http.HttpRequest.builder()
.method(com.openai.core.http.HttpMethod.GET)
.baseUrl("https://example.com")
.addPathSegment("test")
.build();

try (com.openai.core.http.HttpResponse response = openAiClient.execute(openAiRequest)) {
assertEquals(200, response.statusCode());
}

Context capturedContext = capturingClient.getLastContext();
assertNotNull(capturedContext, "Context should have been captured");
Object eagerlyRead = capturedContext.getData("azure-eagerly-read-response").orElse(null);
assertTrue((Boolean) eagerlyRead, "Requests without a body should have azure-eagerly-read-response=true");
}

// ========================================================================
// Test helpers
// ========================================================================

private static com.openai.core.http.HttpRequest createOpenAiRequest() {
return com.openai.core.http.HttpRequest.builder()
.method(com.openai.core.http.HttpMethod.POST)
Expand Down Expand Up @@ -219,6 +401,35 @@ int getSendCount() {
}
}

/**
* HTTP client that captures the Context passed to send(), allowing tests to verify
* context flags like azure-eagerly-read-response.
*/
private static final class ContextCapturingHttpClient implements HttpClient {
private Context lastContext;

@Override
public Mono<HttpResponse> send(HttpRequest request) {
return send(request, Context.NONE);
}

@Override
public Mono<HttpResponse> send(HttpRequest request, Context context) {
this.lastContext = context;
return Mono.just(createMockResponse(request, 200, new HttpHeaders(), "{}"));
}

@Override
public HttpResponse sendSync(HttpRequest request, Context context) {
this.lastContext = context;
return createMockResponse(request, 200, new HttpHeaders(), "{}");
}

Context getLastContext() {
return lastContext;
}
}

private static final class TestHttpRequestBody implements HttpRequestBody {
private final byte[] content;
private final String contentType;
Expand Down
Loading