@@ -30,6 +30,10 @@ import (
3030 "go.etcd.io/etcd/tests/v3/robustness/report"
3131)
3232
33+ var (
34+ errClientIsClosed = errors .New ("client is closed" )
35+ )
36+
3337// RecordingClient provides a semi-etcd client (different interface than
3438// clientv3.Client) that records all the requests and responses made. Doesn't
3539// allow for concurrent requests to conform to model.AppendableHistory requirements.
@@ -45,6 +49,8 @@ type RecordingClient struct {
4549 // mux ensures order of request appending.
4650 kvMux sync.Mutex
4751 kvOperations * model.AppendableHistory
52+
53+ isClosed bool
4854}
4955
5056var _ clientv3.KV = (* RecordingClient )(nil )
@@ -69,11 +75,20 @@ func NewRecordingClient(endpoints []string, ids identity.Provider, baseTime time
6975 client : * cc ,
7076 kvOperations : model .NewAppendableHistory (ids ),
7177 baseTime : baseTime ,
78+ isClosed : false ,
7279 }, nil
7380}
7481
7582func (c * RecordingClient ) Close () error {
76- return c .client .Close ()
83+ c .kvMux .Lock ()
84+ defer c .kvMux .Unlock ()
85+ if c .isClosed {
86+ return nil
87+ }
88+
89+ err := c .client .Close ()
90+ c .isClosed = true // if we set to true only if there is no error, we need to handle the retry in defer
91+ return err
7792}
7893
7994func (c * RecordingClient ) Report () report.ClientReport {
@@ -94,6 +109,12 @@ func (c *RecordingClient) Get(ctx context.Context, key string, opts ...clientv3.
94109}
95110
96111func (c * RecordingClient ) Range (ctx context.Context , start , end string , revision , limit int64 ) (* clientv3.GetResponse , error ) {
112+ c .kvMux .Lock ()
113+ defer c .kvMux .Unlock ()
114+ if c .isClosed {
115+ return nil , errClientIsClosed
116+ }
117+
97118 ops := []clientv3.OpOption {}
98119 if end != "" {
99120 ops = append (ops , clientv3 .WithRange (end ))
@@ -104,8 +125,6 @@ func (c *RecordingClient) Range(ctx context.Context, start, end string, revision
104125 if limit != 0 {
105126 ops = append (ops , clientv3 .WithLimit (limit ))
106127 }
107- c .kvMux .Lock ()
108- defer c .kvMux .Unlock ()
109128 callTime := time .Since (c .baseTime )
110129 resp , err := c .client .Get (ctx , start , ops ... )
111130 returnTime := time .Since (c .baseTime )
@@ -116,6 +135,10 @@ func (c *RecordingClient) Range(ctx context.Context, start, end string, revision
116135func (c * RecordingClient ) Put (ctx context.Context , key , value string , _ ... clientv3.OpOption ) (* clientv3.PutResponse , error ) {
117136 c .kvMux .Lock ()
118137 defer c .kvMux .Unlock ()
138+ if c .isClosed {
139+ return nil , errClientIsClosed
140+ }
141+
119142 callTime := time .Since (c .baseTime )
120143 resp , err := c .client .Put (ctx , key , value )
121144 returnTime := time .Since (c .baseTime )
@@ -126,6 +149,10 @@ func (c *RecordingClient) Put(ctx context.Context, key, value string, _ ...clien
126149func (c * RecordingClient ) Delete (ctx context.Context , key string , _ ... clientv3.OpOption ) (* clientv3.DeleteResponse , error ) {
127150 c .kvMux .Lock ()
128151 defer c .kvMux .Unlock ()
152+ if c .isClosed {
153+ return nil , errClientIsClosed
154+ }
155+
129156 callTime := time .Since (c .baseTime )
130157 resp , err := c .client .Delete (ctx , key )
131158 returnTime := time .Since (c .baseTime )
@@ -172,12 +199,22 @@ func (w *wrappedTxn) Commit() (*clientv3.TxnResponse, error) {
172199}
173200
174201func (c * RecordingClient ) Txn (ctx context.Context ) clientv3.Txn {
202+ c .kvMux .Lock ()
203+ defer c .kvMux .Unlock ()
204+ if c .isClosed {
205+ return nil
206+ }
207+
175208 return & wrappedTxn {txn : c .client .Txn (ctx ), c : c }
176209}
177210
178211func (c * RecordingClient ) LeaseGrant (ctx context.Context , ttl int64 ) (* clientv3.LeaseGrantResponse , error ) {
179212 c .kvMux .Lock ()
180213 defer c .kvMux .Unlock ()
214+ if c .isClosed {
215+ return nil , errClientIsClosed
216+ }
217+
181218 callTime := time .Since (c .baseTime )
182219 resp , err := c .client .Lease .Grant (ctx , ttl )
183220 returnTime := time .Since (c .baseTime )
@@ -188,6 +225,10 @@ func (c *RecordingClient) LeaseGrant(ctx context.Context, ttl int64) (*clientv3.
188225func (c * RecordingClient ) LeaseRevoke (ctx context.Context , leaseID int64 ) (* clientv3.LeaseRevokeResponse , error ) {
189226 c .kvMux .Lock ()
190227 defer c .kvMux .Unlock ()
228+ if c .isClosed {
229+ return nil , errClientIsClosed
230+ }
231+
191232 callTime := time .Since (c .baseTime )
192233 resp , err := c .client .Lease .Revoke (ctx , clientv3 .LeaseID (leaseID ))
193234 returnTime := time .Since (c .baseTime )
@@ -196,9 +237,13 @@ func (c *RecordingClient) LeaseRevoke(ctx context.Context, leaseID int64) (*clie
196237}
197238
198239func (c * RecordingClient ) PutWithLease (ctx context.Context , key string , value string , leaseID int64 ) (* clientv3.PutResponse , error ) {
199- opts := clientv3 .WithLease (clientv3 .LeaseID (leaseID ))
200240 c .kvMux .Lock ()
201241 defer c .kvMux .Unlock ()
242+ if c .isClosed {
243+ return nil , errClientIsClosed
244+ }
245+
246+ opts := clientv3 .WithLease (clientv3 .LeaseID (leaseID ))
202247 callTime := time .Since (c .baseTime )
203248 resp , err := c .client .Put (ctx , key , value , opts )
204249 returnTime := time .Since (c .baseTime )
@@ -209,6 +254,10 @@ func (c *RecordingClient) PutWithLease(ctx context.Context, key string, value st
209254func (c * RecordingClient ) Defragment (ctx context.Context ) (* clientv3.DefragmentResponse , error ) {
210255 c .kvMux .Lock ()
211256 defer c .kvMux .Unlock ()
257+ if c .isClosed {
258+ return nil , errClientIsClosed
259+ }
260+
212261 callTime := time .Since (c .baseTime )
213262 resp , err := c .client .Defragment (ctx , c .client .Endpoints ()[0 ])
214263 returnTime := time .Since (c .baseTime )
@@ -219,6 +268,10 @@ func (c *RecordingClient) Defragment(ctx context.Context) (*clientv3.DefragmentR
219268func (c * RecordingClient ) Compact (ctx context.Context , rev int64 , _ ... clientv3.CompactOption ) (* clientv3.CompactResponse , error ) {
220269 c .kvMux .Lock ()
221270 defer c .kvMux .Unlock ()
271+ if c .isClosed {
272+ return nil , errClientIsClosed
273+ }
274+
222275 callTime := time .Since (c .baseTime )
223276 resp , err := c .client .Compact (ctx , rev )
224277 returnTime := time .Since (c .baseTime )
@@ -229,48 +282,76 @@ func (c *RecordingClient) Compact(ctx context.Context, rev int64, _ ...clientv3.
229282func (c * RecordingClient ) MemberList (ctx context.Context , opts ... clientv3.OpOption ) (* clientv3.MemberListResponse , error ) {
230283 c .kvMux .Lock ()
231284 defer c .kvMux .Unlock ()
285+ if c .isClosed {
286+ return nil , errClientIsClosed
287+ }
288+
232289 resp , err := c .client .MemberList (ctx , opts ... )
233290 return resp , err
234291}
235292
236293func (c * RecordingClient ) MemberAdd (ctx context.Context , peerAddrs []string ) (* clientv3.MemberAddResponse , error ) {
237294 c .kvMux .Lock ()
238295 defer c .kvMux .Unlock ()
296+ if c .isClosed {
297+ return nil , errClientIsClosed
298+ }
299+
239300 resp , err := c .client .MemberAdd (ctx , peerAddrs )
240301 return resp , err
241302}
242303
243304func (c * RecordingClient ) MemberAddAsLearner (ctx context.Context , peerAddrs []string ) (* clientv3.MemberAddResponse , error ) {
244305 c .kvMux .Lock ()
245306 defer c .kvMux .Unlock ()
307+ if c .isClosed {
308+ return nil , errClientIsClosed
309+ }
310+
246311 resp , err := c .client .MemberAddAsLearner (ctx , peerAddrs )
247312 return resp , err
248313}
249314
250315func (c * RecordingClient ) MemberRemove (ctx context.Context , id uint64 ) (* clientv3.MemberRemoveResponse , error ) {
251316 c .kvMux .Lock ()
252317 defer c .kvMux .Unlock ()
318+ if c .isClosed {
319+ return nil , errClientIsClosed
320+ }
321+
253322 resp , err := c .client .MemberRemove (ctx , id )
254323 return resp , err
255324}
256325
257326func (c * RecordingClient ) MemberUpdate (ctx context.Context , id uint64 , peerAddrs []string ) (* clientv3.MemberUpdateResponse , error ) {
258327 c .kvMux .Lock ()
259328 defer c .kvMux .Unlock ()
329+ if c .isClosed {
330+ return nil , errClientIsClosed
331+ }
332+
260333 resp , err := c .client .MemberUpdate (ctx , id , peerAddrs )
261334 return resp , err
262335}
263336
264337func (c * RecordingClient ) MemberPromote (ctx context.Context , id uint64 ) (* clientv3.MemberPromoteResponse , error ) {
265338 c .kvMux .Lock ()
266339 defer c .kvMux .Unlock ()
340+ if c .isClosed {
341+ return nil , errClientIsClosed
342+ }
343+
267344 resp , err := c .client .MemberPromote (ctx , id )
268345 return resp , err
269346}
270347
271348func (c * RecordingClient ) Status (ctx context.Context , endpoint string ) (* clientv3.StatusResponse , error ) {
272349 c .kvMux .Lock ()
273350 defer c .kvMux .Unlock ()
351+ if c .isClosed {
352+ return nil , errClientIsClosed
353+ }
354+
274355 resp , err := c .client .Status (ctx , endpoint )
275356 return resp , err
276357}
@@ -280,6 +361,12 @@ func (c *RecordingClient) Endpoints() []string {
280361}
281362
282363func (c * RecordingClient ) Watch (ctx context.Context , key string , rev int64 , withPrefix bool , withProgressNotify bool , withPrevKV bool ) clientv3.WatchChan {
364+ c .kvMux .Lock ()
365+ defer c .kvMux .Unlock ()
366+ if c .isClosed {
367+ return nil
368+ }
369+
283370 request := model.WatchRequest {
284371 Key : key ,
285372 Revision : rev ,
@@ -333,6 +420,12 @@ func (c *RecordingClient) watch(ctx context.Context, request model.WatchRequest)
333420}
334421
335422func (c * RecordingClient ) RequestProgress (ctx context.Context ) error {
423+ c .kvMux .Lock ()
424+ defer c .kvMux .Unlock ()
425+ if c .isClosed {
426+ return errClientIsClosed
427+ }
428+
336429 return c .client .RequestProgress (ctx )
337430}
338431
@@ -434,7 +527,7 @@ func (cs *ClientSet) close() {
434527 return
435528 }
436529 for _ , c := range cs .clients {
437- c .Close ()
530+ _ = c .Close ()
438531 }
439532 cs .closed = true
440533}
0 commit comments