Skip to content

Commit 5dc4111

Browse files
author
Peter Wilhelmsson
committed
tests: Verify stream counting with consumes
1 parent 1d2b4c0 commit 5dc4111

File tree

1 file changed

+48
-0
lines changed

1 file changed

+48
-0
lines changed

neo4j/internal/bolt/bolt4_test.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -321,6 +321,52 @@ func TestBolt4(ot *testing.T) {
321321
assertBoltState(t, bolt4_ready, bolt)
322322
})
323323

324+
// Verifies that current stream is discarded correctly even if it is larger
325+
// than what is served by a single pull.
326+
ot.Run("Commit while streams, explicit consume", func(t *testing.T) {
327+
qid := int64(2)
328+
bolt, cleanup := connectToServer(t, func(srv *bolt4server) {
329+
srv.accept(4)
330+
srv.waitForTxBegin()
331+
srv.send(msgSuccess, map[string]interface{}{})
332+
// First RunTx
333+
srv.waitForRun()
334+
srv.waitForPullN(1)
335+
// Send Pull response
336+
srv.send(msgSuccess, map[string]interface{}{"fields": []interface{}{"k"}, "t_first": int64(1), "qid": qid})
337+
// Driver should discard this stream which is small
338+
srv.send(msgRecord, []interface{}{"v1"})
339+
srv.send(msgSuccess, map[string]interface{}{"has_more": false})
340+
// Second RunTx
341+
srv.waitForRun()
342+
srv.waitForPullN(1)
343+
srv.send(msgSuccess, map[string]interface{}{"fields": []interface{}{"k"}, "t_first": int64(1), "qid": qid})
344+
// Driver should discard this stream, which is small
345+
srv.send(msgRecord, []interface{}{"v1"})
346+
srv.send(msgSuccess, map[string]interface{}{"has_more": false})
347+
// Wait for the commit
348+
srv.waitForTxCommit()
349+
srv.send(msgSuccess, map[string]interface{}{"bookmark": "x"})
350+
})
351+
defer cleanup()
352+
defer bolt.Close()
353+
354+
tx, err := bolt.TxBegin(db.TxConfig{Mode: db.ReadMode})
355+
AssertNoError(t, err)
356+
s, err := bolt.RunTx(tx, db.Command{Cypher: "Whatever", FetchSize: 1})
357+
AssertNoError(t, err)
358+
_, err = bolt.Consume(s)
359+
AssertNoError(t, err)
360+
s, err = bolt.RunTx(tx, db.Command{Cypher: "Whatever", FetchSize: 1})
361+
AssertNoError(t, err)
362+
_, err = bolt.Consume(s)
363+
AssertNoError(t, err)
364+
365+
err = bolt.TxCommit(tx)
366+
AssertNoError(t, err)
367+
assertBoltState(t, bolt4_ready, bolt)
368+
})
369+
324370
ot.Run("Begin transaction with bookmark success", func(t *testing.T) {
325371
committedBookmark := "cbm"
326372
bolt, cleanup := connectToServer(t, func(srv *bolt4server) {
@@ -655,6 +701,7 @@ func TestBolt4(ot *testing.T) {
655701
sum, err := bolt.Consume(stream)
656702
AssertNoError(t, err)
657703
AssertNotNil(t, sum)
704+
assertBoltState(t, bolt4_ready, bolt)
658705
// The bookmark should be set
659706
AssertStringEqual(t, bolt.Bookmark(), runBookmark)
660707
AssertStringEqual(t, sum.Bookmark, runBookmark)
@@ -696,6 +743,7 @@ func TestBolt4(ot *testing.T) {
696743
sum, err = bolt.Consume(stream)
697744
AssertNoError(t, err)
698745
AssertNotNil(t, sum)
746+
assertBoltState(t, bolt4_ready, bolt)
699747

700748
// The bookmark should be set
701749
AssertStringEqual(t, bolt.Bookmark(), bookmark)

0 commit comments

Comments
 (0)