Skip to content

Commit cc98e38

Browse files
Peter Wilhelmsson2hdddg
authored andcommitted
Fixes for retry logic
When connect to any cluster member fails -> sleep and retry Upon retriable cluster error -> sleep and retry until timeout (not only once) Upon lost connection -> do not retry if failed during commit
1 parent 3fc3a0f commit cc98e38

File tree

2 files changed

+40
-29
lines changed

2 files changed

+40
-29
lines changed

neo4j/internal/pool/pool.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -268,20 +268,22 @@ func (p *Pool) Borrow(ctx context.Context, serverNames []string, wait bool) (Con
268268
}
269269
}
270270

271-
if !wait {
272-
return nil, &PoolFull{servers: serverNames}
273-
}
274-
275271
// If there are no connections for any of the servers, there is no point in waiting for anything
276272
// to be returned.
277273
if !p.anyExistingConnectionsOnServers(serverNames) {
274+
p.log.Warnf(p.logId, "No server connection available to any of %v", serverNames)
278275
if err == nil {
279-
err = &PoolTimeout{err: errors.New("No connection to wait for"), servers: serverNames}
276+
err = errors.New(fmt.Sprintf("No server connection available to any of %v", serverNames))
280277
}
281-
p.log.Error(p.logId, err)
278+
// Intentionally return last error from last connection attempt to make it easier to
279+
// see connection errors for users.
282280
return nil, err
283281
}
284282

283+
if !wait {
284+
return nil, &PoolFull{servers: serverNames}
285+
}
286+
285287
// Wait for a matching connection to be returned from another thread.
286288
p.queueMut.Lock()
287289
// Ok, now that we own the queue we can add the item there but between getting the lock

neo4j/session.go

Lines changed: 32 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -235,26 +235,27 @@ func (s *session) beginTransaction(mode db.AccessMode, config *TransactionConfig
235235
}, nil
236236
}
237237

238-
func (s *session) runOneTry(mode db.AccessMode, work TransactionWork, config *TransactionConfig) (interface{}, error) {
238+
func (s *session) runOneTry(mode db.AccessMode, work TransactionWork, config *TransactionConfig) (interface{}, bool, error) {
239239
tx, err := s.beginTransaction(mode, config)
240240
if err != nil {
241-
return nil, err
241+
return nil, false, err
242242
}
243243
defer func() {
244244
tx.Close()
245245
}()
246246

247247
x, err := work(tx)
248248
if err != nil {
249-
return nil, err
249+
return nil, false, err
250250
}
251251

252252
err = tx.Commit()
253253
if err != nil {
254-
return nil, err
254+
// Indicate that Commit failed, not safe to retry network error in this case
255+
return nil, true, err
255256
}
256257

257-
return x, nil
258+
return x, false, nil
258259
}
259260

260261
func (s *session) runRetriable(
@@ -278,42 +279,52 @@ func (s *session) runRetriable(
278279
}
279280

280281
var (
281-
maxDeadErrors = s.config.MaxConnectionPoolSize / 2
282-
maxClusterErrors = 1
283-
throttle = throttler(s.throttleTime)
284-
start time.Time
282+
maxDeadErrors = s.config.MaxConnectionPoolSize / 2
283+
throttle = throttler(s.throttleTime)
284+
start time.Time
285285
)
286286
for {
287287
// Always return the current connection before trying (again)
288288
s.returnConn()
289289
s.res = nil
290290

291-
x, err := s.runOneTry(mode, work, &config)
291+
x, commitFailure, err := s.runOneTry(mode, work, &config)
292292
if err == nil {
293293
return x, nil
294294
}
295295

296296
s.log.Debugf(s.logId, "Retriable transaction evaluating error: %s", err)
297297

298-
// If we failed due to connect problem, just give up since the pool tries really hard
299-
if s.conn == nil {
300-
s.log.Errorf(s.logId, "Retriable transaction failed due to no available connection: %s", err)
301-
return nil, err
302-
}
303-
304298
// Check retry timeout
305299
if start.IsZero() {
306300
start = s.now()
307301
}
308302
if time.Since(start) > s.config.MaxTransactionRetryTime {
309-
s.log.Errorf(s.logId, "Retriable transaction failed due to reaching MaxTransactionRetryTime: %s", s.config.MaxTransactionRetryTime.String())
303+
s.log.Errorf(s.logId, "Retriable transaction failed due to reaching MaxTransactionRetryTime (%s): %s",
304+
s.config.MaxTransactionRetryTime.String(), err)
310305
return nil, err
311306
}
312307

313308
// Failed, check cause and determine next action
314309

310+
// If we failed due to connect problem, wait a bit and retry
311+
if s.conn == nil {
312+
throttle = throttle.next()
313+
d := throttle.delay()
314+
s.log.Debugf(s.logId, "Retrying transaction due to no available connection after sleeping for %s", d.String())
315+
s.sleep(d)
316+
continue
317+
}
318+
315319
// If the connection is dead just return the connection, get another and try again, no sleep
320+
// Do not do this if the connection died during commit phase since we don't know if we have
321+
// succesfully committed or not, might corrupt data otherwise!
316322
if !s.conn.IsAlive() {
323+
if commitFailure {
324+
err = errors.New(fmt.Sprintf("Retriable transaction failed due to lost connection during commit: %s", err))
325+
s.log.Error(s.logId, err)
326+
return nil, err
327+
}
317328
maxDeadErrors--
318329
if maxDeadErrors < 0 {
319330
s.log.Errorf(s.logId, "Retriable transaction failed due to too many dead connections")
@@ -331,12 +342,10 @@ func (s *session) runRetriable(
331342
case e.IsRetriableCluster():
332343
// Force routing tables to be updated before trying again
333344
s.router.Invalidate(s.databaseName)
334-
maxClusterErrors--
335-
if maxClusterErrors < 0 {
336-
s.log.Errorf(s.logId, "Retriable transaction failed due to encountering too many cluster errors")
337-
return nil, err
338-
}
339-
s.log.Debugf(s.logId, "Retrying transaction due to cluster error")
345+
throttle = throttle.next()
346+
d := throttle.delay()
347+
s.log.Debugf(s.logId, "Retrying transaction due to cluster error after sleeping for %s", d.String())
348+
s.sleep(d)
340349
case e.IsRetriableTransient():
341350
throttle = throttle.next()
342351
d := throttle.delay()

0 commit comments

Comments
 (0)