Skip to content

Commit 9039d1b

Browse files
author
Peter Wilhelmsson
committed
Fixes problem with discarding big streams
If user hasn't consumed the result and the stream contains more than 1000 records (default fetch size) the discard could hang.
1 parent 2a69e97 commit 9039d1b

File tree

2 files changed

+45
-0
lines changed

2 files changed

+45
-0
lines changed

neo4j/internal/bolt/bolt4.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -407,9 +407,18 @@ func (b *bolt4) discardStream() {
407407
return
408408
}
409409

410+
discarded := false
410411
for {
411412
_, batch, sum := b.receiveNext()
412413
if batch {
414+
if discarded {
415+
// Response to discard, see below
416+
return
417+
}
418+
// Discard all! After this the next receive will get another batch
419+
// as a response to the discard, we need to keep track of that we
420+
// already sent a discard.
421+
discarded = true
413422
stream.fetchSize = -1
414423
b.out.appendDiscardNQid(stream.fetchSize, stream.qid)
415424
b.out.send(b.conn)

neo4j/internal/bolt/bolt4_test.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,42 @@ func TestBolt4(ot *testing.T) {
284284
AssertStringEqual(t, committedBookmark, bolt.Bookmark())
285285
})
286286

287+
// Verifies that current stream is discarded correctly even if it is larger
288+
// than what is served by a single pull.
289+
ot.Run("Commit while streaming", func(t *testing.T) {
290+
qid := int64(2)
291+
bolt, cleanup := connectToServer(t, func(srv *bolt4server) {
292+
srv.accept(4)
293+
srv.waitForTxBegin()
294+
srv.send(msgSuccess, map[string]interface{}{})
295+
srv.waitForRun()
296+
srv.waitForPullN(1)
297+
// Send Pull response
298+
srv.send(msgSuccess, map[string]interface{}{"fields": []interface{}{"k"}, "t_first": int64(1), "qid": qid})
299+
// ... and the record
300+
srv.send(msgRecord, []interface{}{"v1"})
301+
// ... and the batch summary
302+
srv.send(msgSuccess, map[string]interface{}{"has_more": true})
303+
// Wait for the discard message
304+
srv.waitForDiscardN(-1, int(qid))
305+
// Respond to discard with has more to indicate that there are more records
306+
srv.send(msgSuccess, map[string]interface{}{"has_more": true})
307+
// Wait for the commit
308+
srv.waitForTxCommit()
309+
srv.send(msgSuccess, map[string]interface{}{"bookmark": "x"})
310+
})
311+
defer cleanup()
312+
defer bolt.Close()
313+
314+
tx, err := bolt.TxBegin(db.TxConfig{Mode: db.ReadMode})
315+
AssertNoError(t, err)
316+
_, err = bolt.RunTx(tx, db.Command{Cypher: "Whatever", FetchSize: 1})
317+
AssertNoError(t, err)
318+
319+
err = bolt.TxCommit(tx)
320+
AssertNoError(t, err)
321+
})
322+
287323
ot.Run("Begin transaction with bookmark success", func(t *testing.T) {
288324
committedBookmark := "cbm"
289325
bolt, cleanup := connectToServer(t, func(srv *bolt4server) {

0 commit comments

Comments
 (0)