Skip to content

Commit 513379d

Browse files
Implement Unwrap method to responseAdapter to properly handle HTTP event-streaming (like Server Send Events) without buffering (#1090)
Implement Unwrap method to responseAdapter to properly handle HTTP event-streaming (like Server Send Events). Add unit test to check if response was flushed. <!-- Include the issue number below --> Fixes golang/go#27816 ## Proposed Changes * GoLang developers implemented the streaming events by immediate flushing the response if detects the proper header. However, the response writer need to implement Flush or Unwrap methods to make it work. responseAdapter used in loadBalancer (in reverseProxy) was not implementing it, so SSE events were pushed to clients **only if buffers were overloaded** (4K bytes). This change fixes this behavior. ## Release Notes ```release-note Fixed the issue with not flushing immediately HTTP text/event-streaming in jsessionid-lb. ```
2 parents da51826 + aa47efb commit 513379d

2 files changed

Lines changed: 40 additions & 0 deletions

File tree

samples/apps/jsessionid-lb/pkg/loadbalancer.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,11 @@ type responseAdapter struct {
3737
stickyCookie string
3838
}
3939

40+
// Unwrap implements http.ResponseWriter to allow the immediate flushing for HTTP Streaming events
41+
func (a *responseAdapter) Unwrap() http.ResponseWriter {
42+
return a.ResponseWriter
43+
}
44+
4045
func (a *responseAdapter) WriteHeader(statusCode int) {
4146
// Append the sticky cookie the response has a session cookie set.
4247

samples/apps/jsessionid-lb/pkg/loadbalancer_test.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,12 @@ package pkg
1717
import (
1818
"bytes"
1919
"errors"
20+
"fmt"
2021
"log"
2122
"net"
2223
"net/http"
2324
"net/http/httptest"
25+
"net/http/httputil"
2426
"testing"
2527
)
2628

@@ -240,6 +242,39 @@ func TestLoadBalancer_ServeHTTP(t *testing.T) {
240242
AssertCookie(t, "SESSION", "http://1.1.1.1:8080", recorder)
241243
},
242244
},
245+
"SSE Event stream flushed immediately": {
246+
loadBalancer: func() *LoadBalancer {
247+
backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
248+
w.Header().Set("Content-Type", "text/event-stream")
249+
if flusher, ok := w.(http.Flusher); ok {
250+
fmt.Fprintf(w, "data: 1\n\n")
251+
flusher.Flush()
252+
}
253+
}))
254+
lb := &LoadBalancer{
255+
LookupIP: lookupOneIP,
256+
ReverseProxy: &httputil.ReverseProxy{
257+
Director: func(req *http.Request) {
258+
req.URL.Scheme = "http"
259+
req.URL.Host = backend.Listener.Addr().String()
260+
},
261+
},
262+
}
263+
defaultValues(nil, lb)
264+
return lb
265+
}(),
266+
request: func() *http.Request {
267+
req := defaultRequest()
268+
req.Header.Set("Accept", "text/event-stream")
269+
return req
270+
}(),
271+
checkResponse: func(t *testing.T, recorder *httptest.ResponseRecorder) {
272+
if !recorder.Flushed {
273+
t.Errorf("Expected response to be flushed for SSE")
274+
}
275+
AssertResponseCode(t, http.StatusOK, recorder)
276+
},
277+
},
243278
} {
244279
t.Run(tn, func(t *testing.T) {
245280
defaultValues(t, tc.loadBalancer)

0 commit comments

Comments
 (0)