Skip to content

Commit 98380f1

Browse files
fbivillePeter Wilhelmsson
authored andcommitted
Implement and use force-reset in session#run (Bolt v4)
Signed-off-by: Peter Wilhelmsson <peter.wilhelmsson@neo4j.com>
1 parent 2e39215 commit 98380f1

File tree

7 files changed

+128
-25
lines changed

7 files changed

+128
-25
lines changed

neo4j/db/connection.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ type Connection interface {
8888
// Resets connection to same state as directly after a connect.
8989
// Active streams will be discarded and the bookmark will be lost.
9090
Reset()
91+
ForceReset() error
9192
// Closes the database connection as well as any underlying connection.
9293
// The instance should not be used after being closed.
9394
Close()

neo4j/internal/bolt/bolt3.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -735,3 +735,7 @@ func (b *bolt3) Close() {
735735
b.conn.Close()
736736
b.state = bolt3_dead
737737
}
738+
739+
func (b *bolt3) ForceReset() error {
740+
return nil
741+
}

neo4j/internal/bolt/bolt4.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -896,3 +896,17 @@ func (b *bolt4) Close() {
896896
func (b *bolt4) SelectDatabase(database string) {
897897
b.databaseName = database
898898
}
899+
900+
func (b *bolt4) ForceReset() error {
901+
if b.state == bolt4_ready {
902+
b.out.appendReset()
903+
b.out.send(b.conn)
904+
if b.err != nil {
905+
return b.err
906+
}
907+
b.receiveMsg()
908+
return b.err
909+
}
910+
b.Reset()
911+
return b.err
912+
}

neo4j/internal/bolt/bolt4_test.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -491,6 +491,37 @@ func TestBolt4(ot *testing.T) {
491491
bolt.Reset()
492492
})
493493

494+
ot.Run("Forces reset in ready state", func(t *testing.T) {
495+
bolt, cleanup := connectToServer(t, func(srv *bolt4server) {
496+
srv.accept(4)
497+
srv.waitForReset()
498+
srv.sendSuccess(map[string]interface{}{})
499+
})
500+
defer cleanup()
501+
defer bolt.Close()
502+
503+
err := bolt.ForceReset()
504+
AssertNoError(t, err)
505+
assertBoltState(t, bolt4_ready, bolt)
506+
})
507+
508+
ot.Run("Forces reset while streaming", func(t *testing.T) {
509+
bolt, cleanup := connectToServer(t, func(srv *bolt4server) {
510+
srv.accept(4)
511+
srv.serveRun(runResponse)
512+
srv.waitForReset()
513+
srv.sendSuccess(map[string]interface{}{})
514+
})
515+
defer cleanup()
516+
defer bolt.Close()
517+
_, err := bolt.Run(db.Command{Cypher: "MATCH (n) RETURN n"}, db.TxConfig{Mode: db.ReadMode})
518+
AssertNoError(t, err)
519+
520+
err = bolt.ForceReset()
521+
AssertNoError(t, err)
522+
assertBoltState(t, bolt4_ready, bolt)
523+
})
524+
494525
// Reset where state is ready
495526

496527
ot.Run("Buffer stream", func(t *testing.T) {

neo4j/internal/testutil/connfake.go

Lines changed: 30 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -40,28 +40,29 @@ type RecordedTx struct {
4040
}
4141

4242
type ConnFake struct {
43-
Name string
44-
Version string
45-
Alive bool
46-
Birth time.Time
47-
Table *db.RoutingTable
48-
Err error
49-
Id int
50-
TxBeginHandle db.TxHandle
51-
RunStream db.StreamHandle
52-
RunTxStream db.StreamHandle
53-
Nexts []Next
54-
Bookm string
55-
TxCommitErr error
56-
TxCommitHook func()
57-
TxRollbackErr error
58-
ResetHook func()
59-
ConsumeSum *db.Summary
60-
ConsumeErr error
61-
ConsumeHook func()
62-
RecordedTxs []RecordedTx // Appended to by Run/TxBegin
63-
BufferErr error
64-
BufferHook func()
43+
Name string
44+
Version string
45+
Alive bool
46+
Birth time.Time
47+
Table *db.RoutingTable
48+
Err error
49+
Id int
50+
TxBeginHandle db.TxHandle
51+
RunStream db.StreamHandle
52+
RunTxStream db.StreamHandle
53+
Nexts []Next
54+
Bookm string
55+
TxCommitErr error
56+
TxCommitHook func()
57+
TxRollbackErr error
58+
ResetHook func()
59+
ConsumeSum *db.Summary
60+
ConsumeErr error
61+
ConsumeHook func()
62+
RecordedTxs []RecordedTx // Appended to by Run/TxBegin
63+
BufferErr error
64+
BufferHook func()
65+
ForceResetHook func() error
6566
}
6667

6768
func (c *ConnFake) ServerName() string {
@@ -145,3 +146,10 @@ func (c *ConnFake) Next(streamHandle db.StreamHandle) (*db.Record, *db.Summary,
145146
}
146147
return next.Record, next.Summary, next.Err
147148
}
149+
150+
func (c *ConnFake) ForceReset() error {
151+
if c.ForceResetHook != nil {
152+
return c.ForceResetHook()
153+
}
154+
return nil
155+
}

neo4j/session.go

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -397,9 +397,19 @@ func (s *session) Run(
397397
c(&config)
398398
}
399399

400-
conn, err := s.getConnection(s.defaultMode)
401-
if err != nil {
402-
return nil, err
400+
var (
401+
conn db.Connection
402+
err error
403+
)
404+
for {
405+
conn, err = s.getConnection(s.defaultMode)
406+
if err != nil {
407+
return nil, err
408+
}
409+
err = conn.ForceReset()
410+
if err == nil {
411+
break
412+
}
403413
}
404414

405415
stream, err := conn.Run(

neo4j/session_test.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,41 @@ func TestSession(st *testing.T) {
167167
})
168168

169169
st.Run("Run", func(bt *testing.T) {
170+
bt.Run("Forces reset on acquired connection", func(t *testing.T) {
171+
_, pool, sess := createSession()
172+
forceResetCalls := 0
173+
connection := &ConnFake{
174+
ForceResetHook: func() error {
175+
forceResetCalls++
176+
return nil
177+
},
178+
}
179+
pool.BorrowConn = connection
180+
181+
_, err := sess.Run("cypher", map[string]interface{}{})
182+
AssertNoError(t, err)
183+
AssertIntEqual(t, forceResetCalls, 1)
184+
})
185+
186+
bt.Run("Picks connection from the pool until force-reset succeeds", func(t *testing.T) {
187+
_, pool, sess := createSession()
188+
forceResetCalls := 0
189+
connection := &ConnFake{
190+
ForceResetHook: func() error {
191+
forceResetCalls++
192+
if forceResetCalls == 1 {
193+
return errors.New("force-reset failure")
194+
}
195+
return nil
196+
},
197+
}
198+
pool.BorrowConn = connection
199+
200+
_, err := sess.Run("cypher", map[string]interface{}{})
201+
AssertNoError(t, err)
202+
AssertIntEqual(t, forceResetCalls, 2)
203+
})
204+
170205
// Checks that chained Run results are buffered and that bookmarks are retrieved for
171206
// those and that a Consume on the last result also gives the appropriate bookmark.
172207
bt.Run("Chained and consume", func(t *testing.T) {

0 commit comments

Comments
 (0)