Skip to content

Commit deda3df

Browse files
committed
Fix the client connection is closing issue in robustness test
Signed-off-by: Chun-Hung Tseng <henrytseng@google.com>
1 parent 926f4c1 commit deda3df

File tree

2 files changed

+149
-2
lines changed

2 files changed

+149
-2
lines changed

‎tests/robustness/client/client.go‎

Lines changed: 140 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@ import (
3030
"go.etcd.io/etcd/tests/v3/robustness/report"
3131
)
3232

33+
var (
34+
errClientIsClosed = errors.New("client is closed")
35+
)
36+
3337
// RecordingClient provides a semi-etcd client (different interface than
3438
// clientv3.Client) that records all the requests and responses made. Doesn't
3539
// allow for concurrent requests to conform to model.AppendableHistory requirements.
@@ -45,6 +49,9 @@ type RecordingClient struct {
4549
// mux ensures order of request appending.
4650
kvMux sync.Mutex
4751
kvOperations *model.AppendableHistory
52+
53+
isClosed bool
54+
sync.Mutex
4855
}
4956

5057
var _ clientv3.KV = (*RecordingClient)(nil)
@@ -69,11 +76,28 @@ func NewRecordingClient(endpoints []string, ids identity.Provider, baseTime time
6976
client: *cc,
7077
kvOperations: model.NewAppendableHistory(ids),
7178
baseTime: baseTime,
79+
isClosed: false,
7280
}, nil
7381
}
7482

7583
func (c *RecordingClient) Close() error {
76-
return c.client.Close()
84+
c.Lock()
85+
defer c.Unlock()
86+
if c.IsClosed() {
87+
return nil
88+
}
89+
90+
err := c.client.Close()
91+
if err != nil {
92+
c.isClosed = true
93+
}
94+
return err
95+
}
96+
97+
func (c *RecordingClient) IsClosed() bool {
98+
c.Lock()
99+
defer c.Unlock()
100+
return c.isClosed
77101
}
78102

79103
func (c *RecordingClient) Report() report.ClientReport {
@@ -89,11 +113,23 @@ func (c *RecordingClient) Do(ctx context.Context, op clientv3.Op) (clientv3.OpRe
89113
}
90114

91115
func (c *RecordingClient) Get(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error) {
116+
c.Lock()
117+
defer c.Unlock()
118+
if c.isClosed {
119+
return nil, errClientIsClosed
120+
}
121+
92122
op := clientv3.OpGet(key, opts...)
93123
return c.Range(ctx, key, string(op.RangeBytes()), op.Rev(), op.Limit())
94124
}
95125

96126
func (c *RecordingClient) Range(ctx context.Context, start, end string, revision, limit int64) (*clientv3.GetResponse, error) {
127+
c.Lock()
128+
defer c.Unlock()
129+
if c.isClosed {
130+
return nil, errClientIsClosed
131+
}
132+
97133
ops := []clientv3.OpOption{}
98134
if end != "" {
99135
ops = append(ops, clientv3.WithRange(end))
@@ -114,6 +150,12 @@ func (c *RecordingClient) Range(ctx context.Context, start, end string, revision
114150
}
115151

116152
func (c *RecordingClient) Put(ctx context.Context, key, value string, _ ...clientv3.OpOption) (*clientv3.PutResponse, error) {
153+
c.Lock()
154+
defer c.Unlock()
155+
if c.isClosed {
156+
return nil, errClientIsClosed
157+
}
158+
117159
c.kvMux.Lock()
118160
defer c.kvMux.Unlock()
119161
callTime := time.Since(c.baseTime)
@@ -124,6 +166,12 @@ func (c *RecordingClient) Put(ctx context.Context, key, value string, _ ...clien
124166
}
125167

126168
func (c *RecordingClient) Delete(ctx context.Context, key string, _ ...clientv3.OpOption) (*clientv3.DeleteResponse, error) {
169+
c.Lock()
170+
defer c.Unlock()
171+
if c.isClosed {
172+
return nil, errClientIsClosed
173+
}
174+
127175
c.kvMux.Lock()
128176
defer c.kvMux.Unlock()
129177
callTime := time.Since(c.baseTime)
@@ -172,10 +220,22 @@ func (w *wrappedTxn) Commit() (*clientv3.TxnResponse, error) {
172220
}
173221

174222
func (c *RecordingClient) Txn(ctx context.Context) clientv3.Txn {
223+
c.Lock()
224+
defer c.Unlock()
225+
if c.isClosed {
226+
return nil
227+
}
228+
175229
return &wrappedTxn{txn: c.client.Txn(ctx), c: c}
176230
}
177231

178232
func (c *RecordingClient) LeaseGrant(ctx context.Context, ttl int64) (*clientv3.LeaseGrantResponse, error) {
233+
c.Lock()
234+
defer c.Unlock()
235+
if c.isClosed {
236+
return nil, errClientIsClosed
237+
}
238+
179239
c.kvMux.Lock()
180240
defer c.kvMux.Unlock()
181241
callTime := time.Since(c.baseTime)
@@ -186,6 +246,12 @@ func (c *RecordingClient) LeaseGrant(ctx context.Context, ttl int64) (*clientv3.
186246
}
187247

188248
func (c *RecordingClient) LeaseRevoke(ctx context.Context, leaseID int64) (*clientv3.LeaseRevokeResponse, error) {
249+
c.Lock()
250+
defer c.Unlock()
251+
if c.isClosed {
252+
return nil, errClientIsClosed
253+
}
254+
189255
c.kvMux.Lock()
190256
defer c.kvMux.Unlock()
191257
callTime := time.Since(c.baseTime)
@@ -196,6 +262,12 @@ func (c *RecordingClient) LeaseRevoke(ctx context.Context, leaseID int64) (*clie
196262
}
197263

198264
func (c *RecordingClient) PutWithLease(ctx context.Context, key string, value string, leaseID int64) (*clientv3.PutResponse, error) {
265+
c.Lock()
266+
defer c.Unlock()
267+
if c.isClosed {
268+
return nil, errClientIsClosed
269+
}
270+
199271
opts := clientv3.WithLease(clientv3.LeaseID(leaseID))
200272
c.kvMux.Lock()
201273
defer c.kvMux.Unlock()
@@ -207,6 +279,12 @@ func (c *RecordingClient) PutWithLease(ctx context.Context, key string, value st
207279
}
208280

209281
func (c *RecordingClient) Defragment(ctx context.Context) (*clientv3.DefragmentResponse, error) {
282+
c.Lock()
283+
defer c.Unlock()
284+
if c.isClosed {
285+
return nil, errClientIsClosed
286+
}
287+
210288
c.kvMux.Lock()
211289
defer c.kvMux.Unlock()
212290
callTime := time.Since(c.baseTime)
@@ -217,6 +295,12 @@ func (c *RecordingClient) Defragment(ctx context.Context) (*clientv3.DefragmentR
217295
}
218296

219297
func (c *RecordingClient) Compact(ctx context.Context, rev int64, _ ...clientv3.CompactOption) (*clientv3.CompactResponse, error) {
298+
c.Lock()
299+
defer c.Unlock()
300+
if c.isClosed {
301+
return nil, errClientIsClosed
302+
}
303+
220304
c.kvMux.Lock()
221305
defer c.kvMux.Unlock()
222306
callTime := time.Since(c.baseTime)
@@ -227,48 +311,90 @@ func (c *RecordingClient) Compact(ctx context.Context, rev int64, _ ...clientv3.
227311
}
228312

229313
func (c *RecordingClient) MemberList(ctx context.Context, opts ...clientv3.OpOption) (*clientv3.MemberListResponse, error) {
314+
c.Lock()
315+
defer c.Unlock()
316+
if c.isClosed {
317+
return nil, errClientIsClosed
318+
}
319+
230320
c.kvMux.Lock()
231321
defer c.kvMux.Unlock()
232322
resp, err := c.client.MemberList(ctx, opts...)
233323
return resp, err
234324
}
235325

236326
func (c *RecordingClient) MemberAdd(ctx context.Context, peerAddrs []string) (*clientv3.MemberAddResponse, error) {
327+
c.Lock()
328+
defer c.Unlock()
329+
if c.isClosed {
330+
return nil, errClientIsClosed
331+
}
332+
237333
c.kvMux.Lock()
238334
defer c.kvMux.Unlock()
239335
resp, err := c.client.MemberAdd(ctx, peerAddrs)
240336
return resp, err
241337
}
242338

243339
func (c *RecordingClient) MemberAddAsLearner(ctx context.Context, peerAddrs []string) (*clientv3.MemberAddResponse, error) {
340+
c.Lock()
341+
defer c.Unlock()
342+
if c.isClosed {
343+
return nil, errClientIsClosed
344+
}
345+
244346
c.kvMux.Lock()
245347
defer c.kvMux.Unlock()
246348
resp, err := c.client.MemberAddAsLearner(ctx, peerAddrs)
247349
return resp, err
248350
}
249351

250352
func (c *RecordingClient) MemberRemove(ctx context.Context, id uint64) (*clientv3.MemberRemoveResponse, error) {
353+
c.Lock()
354+
defer c.Unlock()
355+
if c.isClosed {
356+
return nil, errClientIsClosed
357+
}
358+
251359
c.kvMux.Lock()
252360
defer c.kvMux.Unlock()
253361
resp, err := c.client.MemberRemove(ctx, id)
254362
return resp, err
255363
}
256364

257365
func (c *RecordingClient) MemberUpdate(ctx context.Context, id uint64, peerAddrs []string) (*clientv3.MemberUpdateResponse, error) {
366+
c.Lock()
367+
defer c.Unlock()
368+
if c.isClosed {
369+
return nil, errClientIsClosed
370+
}
371+
258372
c.kvMux.Lock()
259373
defer c.kvMux.Unlock()
260374
resp, err := c.client.MemberUpdate(ctx, id, peerAddrs)
261375
return resp, err
262376
}
263377

264378
func (c *RecordingClient) MemberPromote(ctx context.Context, id uint64) (*clientv3.MemberPromoteResponse, error) {
379+
c.Lock()
380+
defer c.Unlock()
381+
if c.isClosed {
382+
return nil, errClientIsClosed
383+
}
384+
265385
c.kvMux.Lock()
266386
defer c.kvMux.Unlock()
267387
resp, err := c.client.MemberPromote(ctx, id)
268388
return resp, err
269389
}
270390

271391
func (c *RecordingClient) Status(ctx context.Context, endpoint string) (*clientv3.StatusResponse, error) {
392+
c.Lock()
393+
defer c.Unlock()
394+
if c.isClosed {
395+
return nil, errClientIsClosed
396+
}
397+
272398
c.kvMux.Lock()
273399
defer c.kvMux.Unlock()
274400
resp, err := c.client.Status(ctx, endpoint)
@@ -280,6 +406,12 @@ func (c *RecordingClient) Endpoints() []string {
280406
}
281407

282408
func (c *RecordingClient) Watch(ctx context.Context, key string, rev int64, withPrefix bool, withProgressNotify bool, withPrevKV bool) clientv3.WatchChan {
409+
c.Lock()
410+
defer c.Unlock()
411+
if c.isClosed {
412+
return nil
413+
}
414+
283415
request := model.WatchRequest{
284416
Key: key,
285417
Revision: rev,
@@ -333,6 +465,12 @@ func (c *RecordingClient) watch(ctx context.Context, request model.WatchRequest)
333465
}
334466

335467
func (c *RecordingClient) RequestProgress(ctx context.Context) error {
468+
c.Lock()
469+
defer c.Unlock()
470+
if c.isClosed {
471+
return errClientIsClosed
472+
}
473+
336474
return c.client.RequestProgress(ctx)
337475
}
338476

@@ -434,7 +572,7 @@ func (cs *ClientSet) close() {
434572
return
435573
}
436574
for _, c := range cs.clients {
437-
c.Close()
575+
_ = c.Close()
438576
}
439577
cs.closed = true
440578
}

‎tests/robustness/client/watch.go‎

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,9 @@ resetWatch:
103103
return nil
104104
}
105105
watch := c.Watch(ctx, "", lastRevision+1, true, true, false)
106+
if watch == nil {
107+
return nil
108+
}
106109
for {
107110
select {
108111
case revision, ok := <-maxRevisionChan:
@@ -161,13 +164,19 @@ func openWatchPeriodically(ctx context.Context, g *errgroup.Group, c *RecordingC
161164
g.Go(func() error {
162165
resp, err := c.Get(ctx, "/key")
163166
if err != nil {
167+
if errors.Is(err, errClientIsClosed) {
168+
return nil
169+
}
164170
return err
165171
}
166172
rev := resp.Header.Revision + backgroundWatchConfig.RevisionOffset
167173

168174
watchCtx, cancel := context.WithCancel(ctx)
169175
defer cancel()
170176
w := c.Watch(watchCtx, "", rev, true, true, true)
177+
if w == nil {
178+
return nil
179+
}
171180
for {
172181
select {
173182
case <-ctx.Done():

0 commit comments

Comments
 (0)