@@ -213,6 +213,7 @@ func (shard *ringShard) Vote(up bool) bool {
213
213
type ringSharding struct {
214
214
opt * RingOptions
215
215
216
+ addrsMu sync.Mutex
216
217
mu sync.RWMutex
217
218
shards * ringShards
218
219
closed bool
@@ -245,46 +246,64 @@ func (c *ringSharding) OnNewNode(fn func(rdb *Client)) {
245
246
// decrease number of shards, that you use. It will reuse shards that
246
247
// existed before and close the ones that will not be used anymore.
247
248
func (c * ringSharding ) SetAddrs (addrs map [string ]string ) {
248
- c .mu .Lock ()
249
+ c .addrsMu .Lock ()
250
+ defer c .addrsMu .Unlock ()
249
251
252
+ closeShards := func (shards map [string ]* ringShard ) {
253
+ for addr , shard := range shards {
254
+ if err := shard .Client .Close (); err != nil {
255
+ internal .Logger .Printf (context .Background (), "shard.Close %s failed: %s" , addr , err )
256
+ }
257
+ }
258
+ }
259
+
260
+ c .mu .RLock ()
250
261
if c .closed {
251
- c .mu .Unlock ()
262
+ c .mu .RUnlock ()
252
263
return
253
264
}
265
+ shards := c .shards
266
+ c .mu .RUnlock ()
254
267
255
- shards , cleanup := c .newRingShards (addrs , c .shards )
268
+ shards , created , unused := c .newRingShards (addrs , shards )
269
+
270
+ c .mu .Lock ()
271
+ if c .closed {
272
+ closeShards (created )
273
+ c .mu .Unlock ()
274
+ return
275
+ }
256
276
c .shards = shards
257
277
c .mu .Unlock ()
258
278
259
279
c .rebalance ()
260
- cleanup ( )
280
+ closeShards ( unused )
261
281
}
262
282
263
283
func (c * ringSharding ) newRingShards (
264
284
addrs map [string ]string , existingShards * ringShards ,
265
- ) (* ringShards , func () ) {
266
- shardMap : = make (map [string ]* ringShard ) // indexed by addr
267
- unusedShards : = make (map [string ]* ringShard ) // indexed by addr
285
+ ) (shards * ringShards , created , unused map [ string ] * ringShard ) {
286
+ created = make (map [string ]* ringShard )
287
+ unused = make (map [string ]* ringShard )
268
288
269
289
if existingShards != nil {
270
290
for _ , shard := range existingShards .list {
271
- addr := shard .Client .opt .Addr
272
- shardMap [addr ] = shard
273
- unusedShards [addr ] = shard
291
+ unused [shard .addr ] = shard
274
292
}
275
293
}
276
294
277
- shards : = & ringShards {
278
- m : make (map [string ]* ringShard ),
295
+ shards = & ringShards {
296
+ m : make (map [string ]* ringShard , len ( addrs ) ),
279
297
}
280
298
281
299
for name , addr := range addrs {
282
- if shard , ok := shardMap [addr ]; ok {
300
+ if shard , ok := unused [addr ]; ok {
283
301
shards .m [name ] = shard
284
- delete (unusedShards , addr )
302
+ delete (unused , addr )
285
303
} else {
286
304
shard := newRingShard (c .opt , addr )
287
305
shards .m [name ] = shard
306
+ created [addr ] = shard
288
307
289
308
for _ , fn := range c .onNewNode {
290
309
fn (shard .Client )
@@ -296,13 +315,7 @@ func (c *ringSharding) newRingShards(
296
315
shards .list = append (shards .list , shard )
297
316
}
298
317
299
- return shards , func () {
300
- for addr , shard := range unusedShards {
301
- if err := shard .Client .Close (); err != nil {
302
- internal .Logger .Printf (context .Background (), "shard.Close %s failed: %s" , addr , err )
303
- }
304
- }
305
- }
318
+ return
306
319
}
307
320
308
321
func (c * ringSharding ) List () []* ringShard {
0 commit comments