Skip to content

Commit f486391

Browse files
nikola-jokicneild
authored andcommitted
http2: improved Request.Body.Close not to hold lock on connection
The existing implementation holds a lock on a connection which causes issues on a slow Request.Body.Close call. Unlock before Request.Body.Close call. The abortStream closes the request body after unlocking a mutex. The abortStreamLocked returns reqBody as io.ReadCloser if the caller needs to close the body after unlocking the mutex. Fixes golang/go#52853 Change-Id: I0b74ba5263f65393c0e69e1c645d10c4db048903 Reviewed-on: https://go-review.googlesource.com/c/net/+/424755 Reviewed-by: Damien Neil <dneil@google.com> Run-TryBot: Damien Neil <dneil@google.com> Reviewed-by: Dmitri Shuralyov <dmitshur@google.com> TryBot-Result: Gopher Robot <gobot@golang.org>
1 parent aa73b25 commit f486391

File tree

2 files changed

+88
-6
lines changed

2 files changed

+88
-6
lines changed

‎http2/transport.go

Lines changed: 39 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -376,25 +376,33 @@ func (cs *clientStream) get1xxTraceFunc() func(int, textproto.MIMEHeader) error
376376
}
377377

378378
func (cs *clientStream) abortStream(err error) {
379+
var reqBody io.ReadCloser
380+
defer func() {
381+
if reqBody != nil {
382+
reqBody.Close()
383+
}
384+
}()
379385
cs.cc.mu.Lock()
380386
defer cs.cc.mu.Unlock()
381-
cs.abortStreamLocked(err)
387+
reqBody = cs.abortStreamLocked(err)
382388
}
383389

384-
func (cs *clientStream) abortStreamLocked(err error) {
390+
func (cs *clientStream) abortStreamLocked(err error) io.ReadCloser {
385391
cs.abortOnce.Do(func() {
386392
cs.abortErr = err
387393
close(cs.abort)
388394
})
395+
var reqBody io.ReadCloser
389396
if cs.reqBody != nil && !cs.reqBodyClosed {
390-
cs.reqBody.Close()
391397
cs.reqBodyClosed = true
398+
reqBody = cs.reqBody
392399
}
393400
// TODO(dneil): Clean up tests where cs.cc.cond is nil.
394401
if cs.cc.cond != nil {
395402
// Wake up writeRequestBody if it is waiting on flow control.
396403
cs.cc.cond.Broadcast()
397404
}
405+
return reqBody
398406
}
399407

400408
func (cs *clientStream) abortRequestBodyWrite() {
@@ -763,6 +771,12 @@ func (cc *ClientConn) SetDoNotReuse() {
763771
}
764772

765773
func (cc *ClientConn) setGoAway(f *GoAwayFrame) {
774+
var reqBodiesToClose []io.ReadCloser
775+
defer func() {
776+
for _, reqBody := range reqBodiesToClose {
777+
reqBody.Close()
778+
}
779+
}()
766780
cc.mu.Lock()
767781
defer cc.mu.Unlock()
768782

@@ -779,7 +793,10 @@ func (cc *ClientConn) setGoAway(f *GoAwayFrame) {
779793
last := f.LastStreamID
780794
for streamID, cs := range cc.streams {
781795
if streamID > last {
782-
cs.abortStreamLocked(errClientConnGotGoAway)
796+
reqBody := cs.abortStreamLocked(errClientConnGotGoAway)
797+
if reqBody != nil {
798+
reqBodiesToClose = append(reqBodiesToClose, reqBody)
799+
}
783800
}
784801
}
785802
}
@@ -1032,11 +1049,19 @@ func (cc *ClientConn) sendGoAway() error {
10321049
func (cc *ClientConn) closeForError(err error) {
10331050
cc.mu.Lock()
10341051
cc.closed = true
1052+
1053+
var reqBodiesToClose []io.ReadCloser
10351054
for _, cs := range cc.streams {
1036-
cs.abortStreamLocked(err)
1055+
reqBody := cs.abortStreamLocked(err)
1056+
if reqBody != nil {
1057+
reqBodiesToClose = append(reqBodiesToClose, reqBody)
1058+
}
10371059
}
10381060
cc.cond.Broadcast()
10391061
cc.mu.Unlock()
1062+
for _, reqBody := range reqBodiesToClose {
1063+
reqBody.Close()
1064+
}
10401065
cc.closeConn()
10411066
}
10421067

@@ -2084,17 +2109,25 @@ func (rl *clientConnReadLoop) cleanup() {
20842109
err = io.ErrUnexpectedEOF
20852110
}
20862111
cc.closed = true
2112+
2113+
var reqBodiesToClose []io.ReadCloser
20872114
for _, cs := range cc.streams {
20882115
select {
20892116
case <-cs.peerClosed:
20902117
// The server closed the stream before closing the conn,
20912118
// so no need to interrupt it.
20922119
default:
2093-
cs.abortStreamLocked(err)
2120+
reqBody := cs.abortStreamLocked(err)
2121+
if reqBody != nil {
2122+
reqBodiesToClose = append(reqBodiesToClose, reqBody)
2123+
}
20942124
}
20952125
}
20962126
cc.cond.Broadcast()
20972127
cc.mu.Unlock()
2128+
for _, reqBody := range reqBodiesToClose {
2129+
reqBody.Close()
2130+
}
20982131
}
20992132

21002133
// countReadFrameError calls Transport.CountError with a string

‎http2/transport_test.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5999,3 +5999,52 @@ func testTransportClosesConnAfterGoAway(t *testing.T, lastStream uint32) {
59995999

60006000
ct.run()
60016001
}
6002+
6003+
type slowCloser struct {
6004+
closing chan struct{}
6005+
closed chan struct{}
6006+
}
6007+
6008+
func (r *slowCloser) Read([]byte) (int, error) {
6009+
return 0, io.EOF
6010+
}
6011+
6012+
func (r *slowCloser) Close() error {
6013+
close(r.closing)
6014+
<-r.closed
6015+
return nil
6016+
}
6017+
6018+
func TestTransportSlowClose(t *testing.T) {
6019+
st := newServerTester(t, func(w http.ResponseWriter, r *http.Request) {
6020+
}, optOnlyServer)
6021+
defer st.Close()
6022+
6023+
client := st.ts.Client()
6024+
body := &slowCloser{
6025+
closing: make(chan struct{}),
6026+
closed: make(chan struct{}),
6027+
}
6028+
6029+
reqc := make(chan struct{})
6030+
go func() {
6031+
defer close(reqc)
6032+
res, err := client.Post(st.ts.URL, "text/plain", body)
6033+
if err != nil {
6034+
t.Error(err)
6035+
}
6036+
res.Body.Close()
6037+
}()
6038+
defer func() {
6039+
close(body.closed)
6040+
<-reqc // wait for POST request to finish
6041+
}()
6042+
6043+
<-body.closing // wait for POST request to call body.Close
6044+
// This GET request should not be blocked by the in-progress POST.
6045+
res, err := client.Get(st.ts.URL)
6046+
if err != nil {
6047+
t.Fatal(err)
6048+
}
6049+
res.Body.Close()
6050+
}

0 commit comments

Comments
 (0)