Skip to content

Commit f2f64eb

Browse files
fraenkelneild
authored andcommitted
http2: Send WindowUpdates when remaining bytes are below a threshold
Rather than send a WindowUpdate on every chunk of bytes read, allow them to collect until we go past half the configured window size. Once the threshold is reached, send a single WindowUpdate to reset the amount back to the maximum amount configured. Fixes golang/go#28732 Change-Id: I177f962ee0a9b8daa1c4817e0ab7698e828bad96 Reviewed-on: https://go-review.googlesource.com/c/net/+/150197 TryBot-Result: Gopher Robot <gobot@golang.org> Auto-Submit: Damien Neil <dneil@google.com> Reviewed-by: Cherry Mui <cherryyz@google.com> Reviewed-by: Damien Neil <dneil@google.com> Run-TryBot: Damien Neil <dneil@google.com>
1 parent ca03788 commit f2f64eb

File tree

2 files changed

+66
-35
lines changed

2 files changed

+66
-35
lines changed

‎http2/server.go

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -869,9 +869,7 @@ func (sc *serverConn) serve() {
869869

870870
// Each connection starts with initialWindowSize inflow tokens.
871871
// If a higher value is configured, we add more tokens.
872-
if diff := sc.srv.initialConnRecvWindowSize() - initialWindowSize; diff > 0 {
873-
sc.sendWindowUpdate(nil, int(diff))
874-
}
872+
sc.sendWindowUpdate(nil)
875873

876874
if err := sc.readPreface(); err != nil {
877875
sc.condlogf(err, "http2: server: error reading preface from client %v: %v", sc.conn.RemoteAddr(), err)
@@ -1588,7 +1586,7 @@ func (sc *serverConn) closeStream(st *stream, err error) {
15881586
if p := st.body; p != nil {
15891587
// Return any buffered unread bytes worth of conn-level flow control.
15901588
// See golang.org/issue/16481
1591-
sc.sendWindowUpdate(nil, p.Len())
1589+
sc.sendWindowUpdate(nil)
15921590

15931591
p.CloseWithError(err)
15941592
}
@@ -1736,7 +1734,7 @@ func (sc *serverConn) processData(f *DataFrame) error {
17361734
// sendWindowUpdate, which also schedules sending the
17371735
// frames.
17381736
sc.inflow.take(int32(f.Length))
1739-
sc.sendWindowUpdate(nil, int(f.Length)) // conn-level
1737+
sc.sendWindowUpdate(nil) // conn-level
17401738

17411739
if st != nil && st.resetQueued {
17421740
// Already have a stream error in flight. Don't send another.
@@ -1754,7 +1752,7 @@ func (sc *serverConn) processData(f *DataFrame) error {
17541752
return sc.countError("data_flow", streamError(id, ErrCodeFlowControl))
17551753
}
17561754
sc.inflow.take(int32(f.Length))
1757-
sc.sendWindowUpdate(nil, int(f.Length)) // conn-level
1755+
sc.sendWindowUpdate(nil) // conn-level
17581756

17591757
st.body.CloseWithError(fmt.Errorf("sender tried to send more than declared Content-Length of %d bytes", st.declBodyBytes))
17601758
// RFC 7540, sec 8.1.2.6: A request or response is also malformed if the
@@ -1772,7 +1770,7 @@ func (sc *serverConn) processData(f *DataFrame) error {
17721770
if len(data) > 0 {
17731771
wrote, err := st.body.Write(data)
17741772
if err != nil {
1775-
sc.sendWindowUpdate(nil, int(f.Length)-wrote)
1773+
sc.sendWindowUpdate32(nil, int32(f.Length)-int32(wrote))
17761774
return sc.countError("body_write_err", streamError(id, ErrCodeStreamClosed))
17771775
}
17781776
if wrote != len(data) {
@@ -2324,17 +2322,32 @@ func (sc *serverConn) noteBodyReadFromHandler(st *stream, n int, err error) {
23242322

23252323
func (sc *serverConn) noteBodyRead(st *stream, n int) {
23262324
sc.serveG.check()
2327-
sc.sendWindowUpdate(nil, n) // conn-level
2325+
sc.sendWindowUpdate(nil) // conn-level
23282326
if st.state != stateHalfClosedRemote && st.state != stateClosed {
23292327
// Don't send this WINDOW_UPDATE if the stream is closed
23302328
// remotely.
2331-
sc.sendWindowUpdate(st, n)
2329+
sc.sendWindowUpdate(st)
23322330
}
23332331
}
23342332

23352333
// st may be nil for conn-level
2336-
func (sc *serverConn) sendWindowUpdate(st *stream, n int) {
2334+
func (sc *serverConn) sendWindowUpdate(st *stream) {
23372335
sc.serveG.check()
2336+
2337+
var n int32
2338+
if st == nil {
2339+
if avail, windowSize := sc.inflow.available(), sc.srv.initialConnRecvWindowSize(); avail > windowSize/2 {
2340+
return
2341+
} else {
2342+
n = windowSize - avail
2343+
}
2344+
} else {
2345+
if avail, windowSize := st.inflow.available(), sc.srv.initialStreamRecvWindowSize(); avail > windowSize/2 {
2346+
return
2347+
} else {
2348+
n = windowSize - avail
2349+
}
2350+
}
23382351
// "The legal range for the increment to the flow control
23392352
// window is 1 to 2^31-1 (2,147,483,647) octets."
23402353
// A Go Read call on 64-bit machines could in theory read

‎http2/server_test.go

Lines changed: 43 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -809,9 +809,6 @@ func TestServer_Request_Post_Body_ContentLength_TooSmall(t *testing.T) {
809809
EndHeaders: true,
810810
})
811811
st.writeData(1, true, []byte("12345"))
812-
// Return flow control bytes back, since the data handler closed
813-
// the stream.
814-
st.wantWindowUpdate(0, 5)
815812
})
816813
}
817814

@@ -1247,6 +1244,41 @@ func TestServer_Handler_Sends_WindowUpdate(t *testing.T) {
12471244

12481245
st.greet()
12491246

1247+
st.writeHeaders(HeadersFrameParam{
1248+
StreamID: 1, // clients send odd numbers
1249+
BlockFragment: st.encodeHeader(":method", "POST"),
1250+
EndStream: false, // data coming
1251+
EndHeaders: true,
1252+
})
1253+
updateSize := 1 << 20 / 2 // the conn & stream size before a WindowUpdate
1254+
st.writeData(1, false, bytes.Repeat([]byte("a"), updateSize-10))
1255+
st.writeData(1, false, bytes.Repeat([]byte("b"), 10))
1256+
puppet.do(readBodyHandler(t, strings.Repeat("a", updateSize-10)))
1257+
puppet.do(readBodyHandler(t, strings.Repeat("b", 10)))
1258+
1259+
st.wantWindowUpdate(0, uint32(updateSize))
1260+
st.wantWindowUpdate(1, uint32(updateSize))
1261+
1262+
st.writeData(1, false, bytes.Repeat([]byte("a"), updateSize-10))
1263+
st.writeData(1, true, bytes.Repeat([]byte("c"), 15)) // END_STREAM here
1264+
puppet.do(readBodyHandler(t, strings.Repeat("a", updateSize-10)))
1265+
puppet.do(readBodyHandler(t, strings.Repeat("c", 15)))
1266+
1267+
st.wantWindowUpdate(0, uint32(updateSize+5))
1268+
}
1269+
1270+
func TestServer_Handler_Sends_WindowUpdate_SmallStream(t *testing.T) {
1271+
puppet := newHandlerPuppet()
1272+
st := newServerTester(t, func(w http.ResponseWriter, r *http.Request) {
1273+
puppet.act(w, r)
1274+
}, func(s *Server) {
1275+
s.MaxUploadBufferPerStream = 6
1276+
})
1277+
defer st.Close()
1278+
defer puppet.done()
1279+
1280+
st.greet()
1281+
12501282
st.writeHeaders(HeadersFrameParam{
12511283
StreamID: 1, // clients send odd numbers
12521284
BlockFragment: st.encodeHeader(":method", "POST"),
@@ -1255,18 +1287,14 @@ func TestServer_Handler_Sends_WindowUpdate(t *testing.T) {
12551287
})
12561288
st.writeData(1, false, []byte("abcdef"))
12571289
puppet.do(readBodyHandler(t, "abc"))
1258-
st.wantWindowUpdate(0, 3)
1259-
st.wantWindowUpdate(1, 3)
1290+
puppet.do(readBodyHandler(t, "d"))
1291+
puppet.do(readBodyHandler(t, "ef"))
12601292

1261-
puppet.do(readBodyHandler(t, "def"))
1262-
st.wantWindowUpdate(0, 3)
1263-
st.wantWindowUpdate(1, 3)
1293+
st.wantWindowUpdate(1, 6)
12641294

12651295
st.writeData(1, true, []byte("ghijkl")) // END_STREAM here
12661296
puppet.do(readBodyHandler(t, "ghi"))
12671297
puppet.do(readBodyHandler(t, "jkl"))
1268-
st.wantWindowUpdate(0, 3)
1269-
st.wantWindowUpdate(0, 3) // no more stream-level, since END_STREAM
12701298
}
12711299

12721300
// the version of the TestServer_Handler_Sends_WindowUpdate with padding.
@@ -1295,12 +1323,7 @@ func TestServer_Handler_Sends_WindowUpdate_Padding(t *testing.T) {
12951323
st.wantWindowUpdate(1, 5)
12961324

12971325
puppet.do(readBodyHandler(t, "abc"))
1298-
st.wantWindowUpdate(0, 3)
1299-
st.wantWindowUpdate(1, 3)
1300-
13011326
puppet.do(readBodyHandler(t, "def"))
1302-
st.wantWindowUpdate(0, 3)
1303-
st.wantWindowUpdate(1, 3)
13041327
}
13051328

13061329
func TestServer_Send_GoAway_After_Bogus_WindowUpdate(t *testing.T) {
@@ -2296,8 +2319,6 @@ func TestServer_Response_Automatic100Continue(t *testing.T) {
22962319
// gigantic and/or sensitive "foo" payload now.
22972320
st.writeData(1, true, []byte(msg))
22982321

2299-
st.wantWindowUpdate(0, uint32(len(msg)))
2300-
23012322
hf = st.wantHeaders()
23022323
if hf.StreamEnded() {
23032324
t.Fatal("expected data to follow")
@@ -2485,9 +2506,6 @@ func TestServer_NoCrash_HandlerClose_Then_ClientClose(t *testing.T) {
24852506
// it did before.
24862507
st.writeData(1, true, []byte("foo"))
24872508

2488-
// Get our flow control bytes back, since the handler didn't get them.
2489-
st.wantWindowUpdate(0, uint32(len("foo")))
2490-
24912509
// Sent after a peer sends data anyway (admittedly the
24922510
// previous RST_STREAM might've still been in-flight),
24932511
// but they'll get the more friendly 'cancel' code
@@ -3906,7 +3924,6 @@ func TestServer_Rejects_TooSmall(t *testing.T) {
39063924
EndHeaders: true,
39073925
})
39083926
st.writeData(1, true, []byte("12345"))
3909-
st.wantWindowUpdate(0, 5)
39103927
st.wantRSTStream(1, ErrCodeProtocol)
39113928
})
39123929
}
@@ -4199,7 +4216,6 @@ func TestServerWindowUpdateOnBodyClose(t *testing.T) {
41994216
st.writeData(1, false, []byte(content[5:]))
42004217
blockCh <- true
42014218

4202-
increments := len(content)
42034219
for {
42044220
f, err := st.readFrame()
42054221
if err == io.EOF {
@@ -4208,10 +4224,12 @@ func TestServerWindowUpdateOnBodyClose(t *testing.T) {
42084224
if err != nil {
42094225
t.Fatal(err)
42104226
}
4227+
if rs, ok := f.(*RSTStreamFrame); ok && rs.StreamID == 1 {
4228+
break
4229+
}
42114230
if wu, ok := f.(*WindowUpdateFrame); ok && wu.StreamID == 0 {
4212-
increments -= int(wu.Increment)
4213-
if increments == 0 {
4214-
break
4231+
if e, a := uint32(3), wu.Increment; e != a {
4232+
t.Errorf("Increment=%d, want %d", a, e)
42154233
}
42164234
}
42174235
}

0 commit comments

Comments
 (0)