Skip to content

Commit 027891f

Browse files
committed
Handle transaction function panics
1 parent 3e643c1 commit 027891f

File tree

2 files changed

+67
-49
lines changed

2 files changed

+67
-49
lines changed

neo4j/session.go

Lines changed: 44 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -259,56 +259,9 @@ func (s *session) runRetriable(
259259
DatabaseName: s.databaseName,
260260
}
261261
for state.Continue() {
262-
// Establish new connection
263-
conn, err := s.getConnection(mode)
264-
if err != nil {
265-
state.OnFailure(conn, err, false)
266-
continue
267-
}
268-
269-
// Begin transaction
270-
txHandle, err := conn.TxBegin(db.TxConfig{
271-
Mode: mode,
272-
Bookmarks: s.bookmarks,
273-
Timeout: config.Timeout,
274-
Meta: config.Metadata,
275-
})
276-
if err != nil {
277-
state.OnFailure(conn, err, false)
278-
s.pool.Return(conn)
279-
continue
262+
if workResult := s.tryRun(&state, mode, &config, work); workResult != nil {
263+
return workResult, nil
280264
}
281-
282-
// Construct a transaction like thing for client to execute stuff on
283-
// and invoke the client work function.
284-
tx := retryableTransaction{conn: conn, fetchSize: s.fetchSize, txHandle: txHandle}
285-
x, err := work(&tx)
286-
// Evaluate the returned error from all the work for retryable, this means
287-
// that client can mess up the error handling.
288-
if err != nil {
289-
// If the client returns a client specific error that means that
290-
// client wants to rollback. We don't do an explicit rollback here
291-
// but instead realy on pool invoking reset on the connection, that
292-
// will do an implicit rollback.
293-
state.OnFailure(conn, err, false)
294-
s.pool.Return(conn)
295-
continue
296-
}
297-
298-
// Commit transaction
299-
err = conn.TxCommit(txHandle)
300-
if err != nil {
301-
state.OnFailure(conn, err, true)
302-
s.pool.Return(conn)
303-
continue
304-
}
305-
306-
// Collect bookmark and return connection to pool
307-
s.retrieveBookmarks(conn)
308-
s.pool.Return(conn)
309-
310-
// All well
311-
return x, nil
312265
}
313266

314267
// When retries has occured wrap the error, the last error is always added but
@@ -339,6 +292,48 @@ func (s *session) WriteTransaction(
339292
return s.runRetriable(db.WriteMode, work, configurers...)
340293
}
341294

295+
func (s *session) tryRun(state *retry.State, mode db.AccessMode, config *TransactionConfig, work TransactionWork) interface{} {
296+
// Establish new connection
297+
conn, err := s.getConnection(mode)
298+
if err != nil {
299+
state.OnFailure(conn, err, false)
300+
return nil
301+
}
302+
defer s.pool.Return(conn)
303+
txHandle, err := conn.TxBegin(db.TxConfig{
304+
Mode: mode,
305+
Bookmarks: s.bookmarks,
306+
Timeout: config.Timeout,
307+
Meta: config.Metadata,
308+
})
309+
if err != nil {
310+
state.OnFailure(conn, err, false)
311+
return nil
312+
}
313+
314+
tx := retryableTransaction{conn: conn, fetchSize: s.fetchSize, txHandle: txHandle}
315+
x, err := work(&tx)
316+
// Evaluate the returned error from all the work for retryable, this means
317+
// that client can mess up the error handling.
318+
if err != nil {
319+
// If the client returns a client specific error that means that
320+
// client wants to rollback. We don't do an explicit rollback here
321+
// but instead rely on pool invoking reset on the connection, that
322+
// will do an implicit rollback.
323+
state.OnFailure(conn, err, false)
324+
return nil
325+
}
326+
327+
err = conn.TxCommit(txHandle)
328+
if err != nil {
329+
state.OnFailure(conn, err, true)
330+
return nil
331+
}
332+
333+
s.retrieveBookmarks(conn)
334+
return x
335+
}
336+
342337
func (s *session) getServers(ctx context.Context, mode db.AccessMode) ([]string, error) {
343338
if mode == db.ReadMode {
344339
return s.router.Readers(ctx, s.bookmarks, s.databaseName, s.boltLogger)

neo4j/session_test.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,29 @@ func TestSession(st *testing.T) {
129129
AssertSameType(t, err.(*ConnectivityError).inner, &retry.CommitFailedDeadError{})
130130
assertCleanSessionState(t, sess)
131131
})
132+
133+
rt.Run("tx function panic returns conn to pool and bubbles up", func(t *testing.T) {
134+
connectionIsReturned := false
135+
_, pool, newSession := createSession()
136+
pool.BorrowConn = &ConnFake{Alive: true}
137+
pool.ReturnHook = func() {
138+
connectionIsReturned = true
139+
}
140+
transactionFunction := func(Transaction) (interface{}, error) {
141+
panic("oopsie")
142+
}
143+
144+
defer func() {
145+
err := recover()
146+
if err != "oopsie" {
147+
t.Errorf("expected panic to bubble up")
148+
}
149+
if !connectionIsReturned {
150+
t.Errorf("expected connection to return to pool")
151+
}
152+
}()
153+
_, _ = newSession.WriteTransaction(transactionFunction)
154+
})
132155
})
133156

134157
st.Run("Bookmarking", func(bt *testing.T) {

0 commit comments

Comments
 (0)