@@ -2,6 +2,7 @@ package http3
22
33import (
44 "context"
5+ "errors"
56 "fmt"
67 "io"
78 "log/slog"
@@ -19,6 +20,8 @@ import (
1920 "github.com/quic-go/qpack"
2021)
2122
23+ var errGoAway = errors .New ("connection in graceful shutdown" )
24+
2225// Connection is an HTTP/3 connection.
2326// It has all methods from the quic.Connection expect for AcceptStream, AcceptUniStream,
2427// SendDatagram and ReceiveDatagram.
@@ -51,8 +54,10 @@ type connection struct {
5154
5255 decoder * qpack.Decoder
5356
54- streamMx sync.Mutex
55- streams map [quic.StreamID ]* datagrammer
57+ streamMx sync.Mutex
58+ streams map [quic.StreamID ]* datagrammer
59+ lastStreamID quic.StreamID
60+ maxStreamID quic.StreamID
5661
5762 settings * Settings
5863 receivedSettings chan struct {}
@@ -80,6 +85,8 @@ func newConnection(
8085 decoder : qpack .NewDecoder (func (hf qpack.HeaderField ) {}),
8186 receivedSettings : make (chan struct {}),
8287 streams : make (map [quic.StreamID ]* datagrammer ),
88+ maxStreamID : InvalidStreamID ,
89+ lastStreamID : InvalidStreamID ,
8390 Options : options ,
8491 }
8592 if idleTimeout > 0 {
@@ -100,6 +107,13 @@ func (c *connection) clearStream(id quic.StreamID) {
100107 if c .idleTimeout > 0 && len (c .streams ) == 0 {
101108 c .idleTimer .Reset (c .idleTimeout )
102109 }
110+ // The server is performing a graceful shutdown.
111+ // If no more streams are remaining, close the connection.
112+ if c .maxStreamID != InvalidStreamID {
113+ if len (c .streams ) == 0 {
114+ c .CloseWithError (quic .ApplicationErrorCode (ErrCodeNoError ), "" )
115+ }
116+ }
103117}
104118
105119func (c * connection ) openRequestStream (
@@ -109,13 +123,30 @@ func (c *connection) openRequestStream(
109123 disableCompression bool ,
110124 maxHeaderBytes uint64 ,
111125) (* requestStream , error ) {
126+ if c .perspective == PerspectiveClient {
127+ c .streamMx .Lock ()
128+ maxStreamID := c .maxStreamID
129+ var nextStreamID quic.StreamID
130+ if c .lastStreamID == InvalidStreamID {
131+ nextStreamID = 0
132+ } else {
133+ nextStreamID = c .lastStreamID + 4
134+ }
135+ c .streamMx .Unlock ()
136+ // Streams with stream ID equal to or greater than the stream ID carried in the GOAWAY frame
137+ // will be rejected, see section 5.2 of RFC 9114.
138+ if maxStreamID != InvalidStreamID && nextStreamID >= maxStreamID {
139+ return nil , errGoAway
140+ }
141+ }
112142 str , err := c .OpenStreamSync (ctx )
113143 if err != nil {
114144 return nil , err
115145 }
116146 datagrams := newDatagrammer (func (b []byte ) error { return c .sendDatagram (str .StreamID (), b ) })
117147 c .streamMx .Lock ()
118148 c .streams [str .StreamID ()] = datagrams
149+ c .lastStreamID = str .StreamID ()
119150 c .streamMx .Unlock ()
120151 qstr := newStateTrackingStream (str , c , datagrams )
121152 rsp := & http.Response {}
@@ -247,41 +278,94 @@ func (c *connection) handleUnidirectionalStreams(hijack func(StreamType, quic.Co
247278 c .Connection .CloseWithError (quic .ApplicationErrorCode (ErrCodeStreamCreationError ), "duplicate control stream" )
248279 return
249280 }
250- fp := & frameParser {conn : c .Connection , r : str }
251- f , err := fp .ParseNext ()
252- if err != nil {
253- c .Connection .CloseWithError (quic .ApplicationErrorCode (ErrCodeFrameError ), "" )
254- return
255- }
256- sf , ok := f .(* settingsFrame )
257- if ! ok {
258- c .Connection .CloseWithError (quic .ApplicationErrorCode (ErrCodeMissingSettings ), "" )
259- return
260- }
261- c .settings = & Settings {
262- EnableDatagrams : sf .Datagram ,
263- EnableExtendedConnect : sf .ExtendedConnect ,
264- Other : sf .Other ,
265- }
266- close (c .receivedSettings )
267- if ! sf .Datagram {
268- return
281+ c .handleControlStream (str )
282+ }(str )
283+ }
284+ }
285+
286+ func (c * connection ) handleControlStream (str quic.ReceiveStream ) {
287+ fp := & frameParser {conn : c .Connection , r : str }
288+ f , err := fp .ParseNext ()
289+ if err != nil {
290+ var serr * quic.StreamError
291+ if err == io .EOF || errors .As (err , & serr ) {
292+ c .Connection .CloseWithError (quic .ApplicationErrorCode (ErrCodeClosedCriticalStream ), "" )
293+ return
294+ }
295+ c .Connection .CloseWithError (quic .ApplicationErrorCode (ErrCodeFrameError ), "" )
296+ return
297+ }
298+ sf , ok := f .(* settingsFrame )
299+ if ! ok {
300+ c .Connection .CloseWithError (quic .ApplicationErrorCode (ErrCodeMissingSettings ), "" )
301+ return
302+ }
303+ c .settings = & Settings {
304+ EnableDatagrams : sf .Datagram ,
305+ EnableExtendedConnect : sf .ExtendedConnect ,
306+ Other : sf .Other ,
307+ }
308+ close (c .receivedSettings )
309+ if sf .Datagram {
310+ // If datagram support was enabled on our side as well as on the server side,
311+ // we can expect it to have been negotiated both on the transport and on the HTTP/3 layer.
312+ // Note: ConnectionState() will block until the handshake is complete (relevant when using 0-RTT).
313+ if c .enableDatagrams && ! c .ConnectionState ().SupportsDatagrams {
314+ c .CloseWithError (quic .ApplicationErrorCode (ErrCodeSettingsError ), "missing QUIC Datagram support" )
315+ return
316+ }
317+ go func () {
318+ if err := c .receiveDatagrams (); err != nil {
319+ if c .logger != nil {
320+ c .logger .Debug ("receiving datagrams failed" , "error" , err )
321+ }
269322 }
270- // If datagram support was enabled on our side as well as on the server side,
271- // we can expect it to have been negotiated both on the transport and on the HTTP/3 layer.
272- // Note: ConnectionState() will block until the handshake is complete (relevant when using 0-RTT).
273- if c .enableDatagrams && ! c .ConnectionState ().SupportsDatagrams {
274- c .CloseWithError (quic .ApplicationErrorCode (ErrCodeSettingsError ), "missing QUIC Datagram support" )
323+ }()
324+ }
325+
326+ // we don't support server push, hence we don't expect any GOAWAY frames from the client
327+ if c .perspective == PerspectiveServer {
328+ return
329+ }
330+
331+ for {
332+ f , err := fp .ParseNext ()
333+ if err != nil {
334+ var serr * quic.StreamError
335+ if err == io .EOF || errors .As (err , & serr ) {
336+ c .Connection .CloseWithError (quic .ApplicationErrorCode (ErrCodeClosedCriticalStream ), "" )
275337 return
276338 }
277- go func () {
278- if err := c .receiveDatagrams (); err != nil {
279- if c .logger != nil {
280- c .logger .Debug ("receiving datagrams failed" , "error" , err )
281- }
282- }
283- }()
284- }(str )
339+ c .Connection .CloseWithError (quic .ApplicationErrorCode (ErrCodeFrameError ), "" )
340+ return
341+ }
342+ // GOAWAY is the only frame allowed at this point:
343+ // * unexpected frames are ignored by the frame parser
344+ // * we don't support any extension that might add support for more frames
345+ goaway , ok := f .(* goAwayFrame )
346+ if ! ok {
347+ c .Connection .CloseWithError (quic .ApplicationErrorCode (ErrCodeFrameUnexpected ), "" )
348+ return
349+ }
350+ if goaway .StreamID % 4 != 0 { // client-initiated, bidirectional streams
351+ c .Connection .CloseWithError (quic .ApplicationErrorCode (ErrCodeIDError ), "" )
352+ return
353+ }
354+ c .streamMx .Lock ()
355+ if c .maxStreamID != InvalidStreamID && goaway .StreamID > c .maxStreamID {
356+ c .streamMx .Unlock ()
357+ c .Connection .CloseWithError (quic .ApplicationErrorCode (ErrCodeIDError ), "" )
358+ return
359+ }
360+ c .maxStreamID = goaway .StreamID
361+ hasActiveStreams := len (c .streams ) > 0
362+ c .streamMx .Unlock ()
363+
364+ // immediately close the connection if there are currently no active requests
365+ if ! hasActiveStreams {
366+ c .CloseWithError (quic .ApplicationErrorCode (ErrCodeNoError ), "" )
367+ return
368+ }
285369 }
286370}
287371
@@ -311,11 +395,10 @@ func (c *connection) receiveDatagrams() error {
311395 streamID := quic .StreamID (4 * quarterStreamID )
312396 c .streamMx .Lock ()
313397 dg , ok := c .streams [streamID ]
398+ c .streamMx .Unlock ()
314399 if ! ok {
315- c .streamMx .Unlock ()
316- return nil
400+ continue
317401 }
318- c .streamMx .Unlock ()
319402 dg .enqueue (b [n :])
320403 }
321404}
0 commit comments