Skip to content

Commit 07fb3de

Browse files
author
Peter Wilhelmsson
committed
Remove discarded stream
The stream will not be readable after it has been discarded. This could cause applications to get an error that they didn't previously got IF they use results outside of transaction boundary (which is not allowed) BUT even if they previously didn't see an error they actually didn't get the correct result.
1 parent 9039d1b commit 07fb3de

File tree

3 files changed

+12
-6
lines changed

3 files changed

+12
-6
lines changed

neo4j/internal/bolt/bolt4.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -413,6 +413,7 @@ func (b *bolt4) discardStream() {
413413
if batch {
414414
if discarded {
415415
// Response to discard, see below
416+
b.streams.remove(stream)
416417
return
417418
}
418419
// Discard all! After this the next receive will get another batch
@@ -423,6 +424,7 @@ func (b *bolt4) discardStream() {
423424
b.out.appendDiscardNQid(stream.fetchSize, stream.qid)
424425
b.out.send(b.conn)
425426
} else if sum != nil || b.err != nil {
427+
b.streams.remove(stream)
426428
return
427429
}
428430
}

neo4j/internal/bolt/bolt4_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -318,6 +318,7 @@ func TestBolt4(ot *testing.T) {
318318

319319
err = bolt.TxCommit(tx)
320320
AssertNoError(t, err)
321+
assertBoltState(t, bolt4_ready, bolt)
321322
})
322323

323324
ot.Run("Begin transaction with bookmark success", func(t *testing.T) {

neo4j/internal/bolt/stream.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -115,13 +115,9 @@ func (o *openstreams) detach(sum *db.Summary, err error) {
115115
}
116116

117117
o.remove(o.curr)
118-
o.curr = nil
119-
if o.num <= 0 {
120-
o.onEmpty()
121-
}
122118
}
123119

124-
// Streams can be paused when they have received a 'has_more' response from server
120+
// Streams can be paused when they have received a "has_more" response from server
125121
// Pauses the current stream
126122
func (o *openstreams) pause() {
127123
o.curr = nil
@@ -136,10 +132,17 @@ func (o *openstreams) resume(s *stream) {
136132
o.curr = s
137133
}
138134

139-
// Internal, "removes" the stream by setting it's corresponding entry to nil.
135+
// Removes the stream by disabling its key and removing it from the count of streams.
136+
// If the stream is current the current is set to nil.
140137
func (o *openstreams) remove(s *stream) {
141138
o.num--
142139
s.key = 0
140+
if o.curr == s {
141+
o.curr = nil
142+
}
143+
if o.num == 0 {
144+
o.onEmpty()
145+
}
143146
}
144147

145148
func (o *openstreams) reset() {

0 commit comments

Comments
 (0)