Skip to content

Commit 6004b0d

Browse files
committed
Fixed deadlock in transport
1 parent 552de12 commit 6004b0d

File tree

2 files changed

+51
-3
lines changed

2 files changed

+51
-3
lines changed

‎internal/transport/http2_client.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1232,16 +1232,23 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
12321232
if upperLimit == 0 { // This is the first GoAway Frame.
12331233
upperLimit = math.MaxUint32 // Kill all streams after the GoAway ID.
12341234
}
1235+
1236+
activeStreams := make(map[uint32]*Stream)
12351237
for streamID, stream := range t.activeStreams {
1238+
activeStreams[streamID] = stream
1239+
}
1240+
1241+
t.prevGoAwayID = id
1242+
t.mu.Unlock()
1243+
for streamID, stream := range activeStreams {
12361244
if streamID > id && streamID <= upperLimit {
12371245
// The stream was unprocessed by the server.
12381246
atomic.StoreUint32(&stream.unprocessed, 1)
12391247
t.closeStream(stream, errStreamDrain, false, http2.ErrCodeNo, statusGoAway, nil, false)
12401248
}
12411249
}
1242-
t.prevGoAwayID = id
1243-
active := len(t.activeStreams)
1244-
t.mu.Unlock()
1250+
1251+
active := len(activeStreams)
12451252
if active == 0 {
12461253
t.Close(connectionErrorf(true, nil, "received goaway and there are no active streams"))
12471254
}

‎internal/transport/transport_test.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2501,3 +2501,44 @@ func (s) TestPeerSetInServerContext(t *testing.T) {
25012501
}
25022502
server.mu.Unlock()
25032503
}
2504+
2505+
// TestGoAwayCloseStreams tests the scenario where a client has many streams
2506+
// created, and the server sends a GOAWAY frame with a stream id less than some
2507+
// of them, while the client is still creating new streams. This should not
2508+
// induce a deadlock.
2509+
func (s) TestGoAwayCloseStreams(t *testing.T) {
2510+
server, ct, cancel := setUp(t, 0, math.MaxUint32, normal)
2511+
defer cancel()
2512+
defer server.stop()
2513+
defer ct.Close(fmt.Errorf("closed manually by test"))
2514+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
2515+
defer cancel()
2516+
for i := 0; i < 5; i++ {
2517+
_, err := ct.NewStream(ctx, &CallHdr{})
2518+
if err != nil {
2519+
t.Fatalf("error creating stream: %v", err)
2520+
}
2521+
}
2522+
2523+
waitWhileTrue(t, func() (bool, error) {
2524+
server.mu.Lock()
2525+
defer server.mu.Unlock()
2526+
2527+
if len(server.conns) == 0 {
2528+
return true, fmt.Errorf("timed-out while waiting for connection to be created on the server")
2529+
}
2530+
return false, nil
2531+
})
2532+
2533+
var st *http2Server
2534+
server.mu.Lock()
2535+
for k := range server.conns {
2536+
st = k.(*http2Server)
2537+
}
2538+
server.mu.Unlock()
2539+
2540+
st.framer.fr.WriteGoAway(5, http2.ErrCodeNo, []byte{})
2541+
for i := 0; i < 10; i++ {
2542+
ct.NewStream(ctx, &CallHdr{})
2543+
}
2544+
}

0 commit comments

Comments
 (0)