@@ -11,11 +11,14 @@ const ResendResponseResending = require('../../src/messages/ResendResponseResend
11
11
const ResendResponseResent = require ( '../../src/messages/ResendResponseResent' )
12
12
const StorageNodesMessage = require ( '../../src/messages/StorageNodesMessage' )
13
13
const UnicastMessage = require ( '../../src/messages/UnicastMessage' )
14
- const { wait } = require ( '../util' )
15
14
const { MessageID, MessageReference, StreamID } = require ( '../../src/identifiers' )
16
15
const NodeToNode = require ( '../../src/protocol/NodeToNode' )
17
16
const TrackerNode = require ( '../../src/protocol/TrackerNode' )
18
17
18
+ jest . useFakeTimers ( )
19
+
20
+ const TIMEOUT = 10000
21
+
19
22
/**
20
23
* Collect data of a stream into an array. The array is wrapped in a Promise
21
24
* that resolves when the stream has ended, i.e., event `end` is emitted by
@@ -135,7 +138,6 @@ describe('StorageResendStrategy#getResendResponseStream', () => {
135
138
} )
136
139
137
140
describe ( 'AskNeighborsResendStrategy#getResendResponseStream' , ( ) => {
138
- const TIMEOUT = 50
139
141
let nodeToNode
140
142
let getNeighbors
141
143
let resendStrategy
@@ -180,17 +182,20 @@ describe('AskNeighborsResendStrategy#getResendResponseStream', () => {
180
182
expect ( nodeToNode . send ) . toBeCalledWith ( 'neighbor-1' , request )
181
183
} )
182
184
183
- test ( 'if forwarding request to first neighbor fails, forwards to 2nd' , async ( ) => {
185
+ test ( 'if forwarding request to first neighbor fails, forwards to 2nd' , ( done ) => {
184
186
nodeToNode . send = jest . fn ( )
185
187
. mockReturnValueOnce ( Promise . reject ( ) )
186
188
. mockReturnValueOnce ( Promise . resolve ( ) )
187
189
188
190
resendStrategy . getResendResponseStream ( request )
189
- await wait ( 0 )
190
191
191
- expect ( nodeToNode . send ) . toBeCalledTimes ( 2 )
192
- expect ( nodeToNode . send ) . toBeCalledWith ( 'neighbor-1' , request )
193
- expect ( nodeToNode . send ) . toBeCalledWith ( 'neighbor-2' , request )
192
+ setImmediate ( ( ) => {
193
+ jest . runAllTimers ( )
194
+ expect ( nodeToNode . send ) . toBeCalledTimes ( 2 )
195
+ expect ( nodeToNode . send ) . toBeCalledWith ( 'neighbor-1' , request )
196
+ expect ( nodeToNode . send ) . toBeCalledWith ( 'neighbor-2' , request )
197
+ done ( )
198
+ } )
194
199
} )
195
200
196
201
test ( 'if forwarding request to both neighbors fails (maxTries=2) returns empty stream' , async ( ) => {
@@ -200,15 +205,16 @@ describe('AskNeighborsResendStrategy#getResendResponseStream', () => {
200
205
201
206
const responseStream = resendStrategy . getResendResponseStream ( request )
202
207
const streamAsArray = await streamToArray ( responseStream )
208
+
203
209
expect ( streamAsArray ) . toEqual ( [ ] )
204
210
} )
205
211
206
- test ( 'avoids forwarding request to same neighbor again' , ( ) => {
212
+ test ( 'avoids forwarding request to same neighbor again' , async ( ) => {
207
213
getNeighbors . mockClear ( )
208
214
getNeighbors . mockReturnValue ( [ 'neighbor-1' , 'neighbor-1' , 'neighbor-1' ] )
209
215
nodeToNode . send = jest . fn ( ) . mockReturnValue ( Promise . reject ( ) )
210
216
211
- resendStrategy . getResendResponseStream ( request )
217
+ await streamToArray ( resendStrategy . getResendResponseStream ( request ) )
212
218
213
219
expect ( nodeToNode . send ) . toBeCalledTimes ( 1 )
214
220
} )
@@ -224,57 +230,62 @@ describe('AskNeighborsResendStrategy#getResendResponseStream', () => {
224
230
expect ( nodeToNode . send ) . toBeCalledTimes ( 1 ) // sanity check
225
231
} )
226
232
227
- test ( 'if no response within timeout, move to next neighbor' , ( done ) => {
228
- setTimeout ( ( ) => {
229
- expect ( nodeToNode . send ) . toBeCalledTimes ( 2 )
230
- done ( )
231
- } , TIMEOUT )
233
+ test ( 'if no response within timeout, move to next neighbor' , ( ) => {
234
+ jest . advanceTimersByTime ( TIMEOUT )
235
+ expect ( nodeToNode . send ) . toBeCalledTimes ( 2 )
232
236
} )
233
237
234
238
test ( 'if neighbor disconnects, move to next neighbor' , ( ) => {
235
239
nodeToNode . emit ( NodeToNode . events . NODE_DISCONNECTED , 'neighbor-1' )
236
240
expect ( nodeToNode . send ) . toBeCalledTimes ( 2 )
237
241
} )
238
242
239
- test ( 'if neighbor responds with ResendResponseResending, extend timeout' , ( done ) => {
240
- setTimeout ( ( ) => {
241
- expect ( nodeToNode . send ) . toBeCalledTimes ( 1 )
242
- done ( )
243
- } , TIMEOUT )
243
+ test ( 'if neighbor responds with ResendResponseResending, extend timeout' , ( ) => {
244
+ jest . advanceTimersByTime ( TIMEOUT - 1 )
245
+ nodeToNode . emit (
246
+ NodeToNode . events . RESEND_RESPONSE ,
247
+ new ResendResponseResending ( new StreamID ( 'streamId' , 0 ) , 'subId' , 'neighbor-1' )
248
+ )
249
+ jest . advanceTimersByTime ( TIMEOUT - 1 )
244
250
245
- nodeToNode . emit ( NodeToNode . events . RESEND_RESPONSE ,
246
- new ResendResponseResending ( new StreamID ( 'streamId' , 0 ) , 'subId' , 'neighbor-1' ) )
251
+ expect ( nodeToNode . send ) . toBeCalledTimes ( 1 )
247
252
} )
248
253
249
- test ( 'if neighbor responds with UnicastMessage, extend timeout' , ( done ) => {
250
- setTimeout ( ( ) => {
251
- expect ( nodeToNode . send ) . toBeCalledTimes ( 1 )
252
- done ( )
253
- } , TIMEOUT )
254
+ test ( 'if neighbor responds with UnicastMessage, extend timeout' , ( ) => {
255
+ jest . advanceTimersByTime ( TIMEOUT - 1 )
256
+ nodeToNode . emit (
257
+ NodeToNode . events . UNICAST_RECEIVED ,
258
+ new UnicastMessage (
259
+ new MessageID ( new StreamID ( 'streamId' , 0 ) , 0 , 0 , '' , '' ) ,
260
+ null ,
261
+ { } ,
262
+ '' ,
263
+ 0 ,
264
+ 'subId' ,
265
+ 'neighbor-1'
266
+ )
267
+ )
268
+ jest . advanceTimersByTime ( TIMEOUT - 1 )
254
269
255
- nodeToNode . emit ( NodeToNode . events . UNICAST_RECEIVED , new UnicastMessage (
256
- new MessageID ( new StreamID ( 'streamId' , 0 ) , 0 , 0 , '' , '' ) ,
257
- null ,
258
- { } ,
259
- '' ,
260
- 0 ,
261
- 'subId' ,
262
- 'neighbor-1'
263
- ) )
270
+ expect ( nodeToNode . send ) . toBeCalledTimes ( 1 )
264
271
} )
265
272
266
273
test ( 'if neighbor responds with ResendResponseNoResend, move to next neighbor' , ( ) => {
267
- nodeToNode . emit ( NodeToNode . events . RESEND_RESPONSE ,
268
- new ResendResponseNoResend ( new StreamID ( 'streamId' , 0 ) , 'subId' , 'neighbor-1' ) )
274
+ nodeToNode . emit (
275
+ NodeToNode . events . RESEND_RESPONSE ,
276
+ new ResendResponseNoResend ( new StreamID ( 'streamId' , 0 ) , 'subId' , 'neighbor-1' )
277
+ )
269
278
expect ( nodeToNode . send ) . toBeCalledTimes ( 2 )
270
279
} )
271
280
272
281
test ( 'if neighbor responds with ResendResponseResent, returned stream is closed' , async ( ) => {
273
- nodeToNode . emit ( NodeToNode . events . RESEND_RESPONSE ,
274
- new ResendResponseResent ( new StreamID ( 'streamId' , 0 ) , 'subId' , 'neighbor-1' ) )
275
-
276
- await streamToArray ( responseStream )
282
+ nodeToNode . emit (
283
+ NodeToNode . events . RESEND_RESPONSE ,
284
+ new ResendResponseResent ( new StreamID ( 'streamId' , 0 ) , 'subId' , 'neighbor-1' )
285
+ )
277
286
287
+ // eslint-disable-next-line no-underscore-dangle
288
+ expect ( responseStream . _readableState . ended ) . toEqual ( true )
278
289
expect ( nodeToNode . send ) . toBeCalledTimes ( 1 ) // ensure next neighbor wasn't asked
279
290
} )
280
291
@@ -297,9 +308,9 @@ describe('AskNeighborsResendStrategy#getResendResponseStream', () => {
297
308
298
309
nodeToNode . emit ( NodeToNode . events . UNICAST_RECEIVED , u1 )
299
310
nodeToNode . emit ( NodeToNode . events . UNICAST_RECEIVED , u2 )
300
- await wait ( 10 )
311
+ jest . advanceTimersByTime ( TIMEOUT / 10 )
301
312
nodeToNode . emit ( NodeToNode . events . UNICAST_RECEIVED , u3 )
302
- await wait ( 10 )
313
+ jest . advanceTimersByTime ( TIMEOUT / 10 )
303
314
nodeToNode . emit ( NodeToNode . events . UNICAST_RECEIVED , u4 )
304
315
nodeToNode . emit ( NodeToNode . events . UNICAST_RECEIVED , u5 )
305
316
nodeToNode . emit ( NodeToNode . events . RESEND_RESPONSE ,
@@ -312,7 +323,6 @@ describe('AskNeighborsResendStrategy#getResendResponseStream', () => {
312
323
} )
313
324
314
325
describe ( 'StorageNodeResendStrategy#getResendResponseStream' , ( ) => {
315
- const TIMEOUT = 50
316
326
let nodeToNode
317
327
let trackerNode
318
328
let getTracker
@@ -380,6 +390,7 @@ describe('StorageNodeResendStrategy#getResendResponseStream', () => {
380
390
} )
381
391
382
392
test ( 'if tracker does not respond within timeout, returns empty stream' , async ( ) => {
393
+ jest . advanceTimersByTime ( TIMEOUT )
383
394
const streamAsArray = await streamToArray ( responseStream )
384
395
expect ( streamAsArray ) . toEqual ( [ ] )
385
396
} )
@@ -393,7 +404,7 @@ describe('StorageNodeResendStrategy#getResendResponseStream', () => {
393
404
expect ( streamAsArray ) . toEqual ( [ ] )
394
405
} )
395
406
396
- test ( 'if tracker responds with storage nodes, connects to them one by one until success' , async ( ) => {
407
+ test ( 'if tracker responds with storage nodes, connects to them one by one until success' , ( done ) => {
397
408
nodeToNode . disconnectFromNode = jest . fn ( )
398
409
nodeToNode . send = jest . fn ( ) . mockReturnValue ( Promise . resolve ( ) )
399
410
nodeToNode . connectToNode = jest . fn ( )
@@ -411,12 +422,15 @@ describe('StorageNodeResendStrategy#getResendResponseStream', () => {
411
422
'ws://storageNode-4'
412
423
] )
413
424
)
414
- await wait ( 0 ) // Defer execution of below assertions
415
425
416
- expect ( nodeToNode . connectToNode ) . toBeCalledTimes ( 3 )
417
- expect ( nodeToNode . connectToNode ) . toBeCalledWith ( 'ws://storageNode-1' )
418
- expect ( nodeToNode . connectToNode ) . toBeCalledWith ( 'ws://storageNode-2' )
419
- expect ( nodeToNode . connectToNode ) . toBeCalledWith ( 'ws://storageNode-3' )
426
+ setImmediate ( ( ) => {
427
+ jest . runAllTimers ( )
428
+ expect ( nodeToNode . connectToNode ) . toBeCalledTimes ( 3 )
429
+ expect ( nodeToNode . connectToNode ) . toBeCalledWith ( 'ws://storageNode-1' )
430
+ expect ( nodeToNode . connectToNode ) . toBeCalledWith ( 'ws://storageNode-2' )
431
+ expect ( nodeToNode . connectToNode ) . toBeCalledWith ( 'ws://storageNode-3' )
432
+ done ( )
433
+ } )
420
434
} )
421
435
422
436
test ( 'if tracker responds with non-connectable storage nodes, returns empty stream' , async ( ) => {
@@ -449,12 +463,12 @@ describe('StorageNodeResendStrategy#getResendResponseStream', () => {
449
463
responseStream = resendStrategy . getResendResponseStream ( request )
450
464
} )
451
465
452
- const emitTrackerResponse = async ( ) => {
466
+ const emitTrackerResponse = ( ) => {
453
467
trackerNode . emit (
454
468
TrackerNode . events . STORAGE_NODES_RECEIVED ,
455
469
new StorageNodesMessage ( new StreamID ( 'streamId' , 0 ) , [ 'ws://storageNode' ] )
456
470
)
457
- await wait ( 0 ) // defer execution
471
+ return new Promise ( ( resolve ) => setImmediate ( resolve ) )
458
472
}
459
473
460
474
test ( 'forwards request to storage node' , async ( ) => {
@@ -491,50 +505,47 @@ describe('StorageNodeResendStrategy#getResendResponseStream', () => {
491
505
describe ( 'after forwarding request to storage node' , ( ) => {
492
506
let responseStream
493
507
494
- beforeEach ( async ( ) => {
508
+ beforeEach ( ( done ) => {
495
509
getTracker . mockReturnValue ( [ 'tracker' ] )
496
510
trackerNode . findStorageNodes = ( ) => Promise . resolve ( )
497
511
nodeToNode . connectToNode = ( ) => Promise . resolve ( 'storageNode' )
498
512
nodeToNode . send = jest . fn ( ) . mockResolvedValue ( null )
499
513
nodeToNode . disconnectFromNode = jest . fn ( )
500
514
501
515
responseStream = resendStrategy . getResendResponseStream ( request )
502
- await wait ( 0 ) // wait for this.trackerNode.findStorageNodes(...)
503
-
504
- trackerNode . emit (
505
- TrackerNode . events . STORAGE_NODES_RECEIVED ,
506
- new StorageNodesMessage ( new StreamID ( 'streamId' , 0 ) , [ 'ws://storageNode' ] )
507
- )
508
- } )
509
516
510
- test ( 'if no response within timeout, returns empty stream' , ( done ) => {
511
- setTimeout ( async ( ) => {
512
- // eslint-disable-next-line no-underscore-dangle
513
- expect ( responseStream . _readableState . ended ) . toEqual ( true )
514
- const streamAsArray = await streamToArray ( responseStream )
515
- expect ( streamAsArray ) . toEqual ( [ ] )
517
+ setImmediate ( ( ) => { // wait for this.trackerNode.findStorageNodes(...)
518
+ trackerNode . emit (
519
+ TrackerNode . events . STORAGE_NODES_RECEIVED ,
520
+ new StorageNodesMessage ( new StreamID ( 'streamId' , 0 ) , [ 'ws://storageNode' ] )
521
+ )
516
522
done ( )
517
- } , TIMEOUT )
523
+ } )
518
524
} )
519
525
520
- test ( 'if storage node responds with ResendResponseResending, extend timeout' , ( done ) => {
521
- setTimeout ( ( ) => {
522
- // eslint-disable-next-line no-underscore-dangle
523
- expect ( responseStream . _readableState . ended ) . toEqual ( false )
524
- done ( )
525
- } , TIMEOUT )
526
+ test ( 'if no response within timeout, returns empty stream' , async ( ) => {
527
+ jest . advanceTimersByTime ( TIMEOUT )
526
528
527
- nodeToNode . emit ( NodeToNode . events . RESEND_RESPONSE ,
528
- new ResendResponseResending ( new StreamID ( 'streamId' , 0 ) , 'subId' , 'storageNode' ) )
529
+ // eslint-disable-next-line no-underscore-dangle
530
+ expect ( responseStream . _readableState . ended ) . toEqual ( true )
531
+ const streamAsArray = await streamToArray ( responseStream )
532
+ expect ( streamAsArray ) . toEqual ( [ ] )
529
533
} )
530
534
531
- test ( 'if storage node responds with UnicastMessage, extend timeout' , ( done ) => {
532
- setTimeout ( ( ) => {
533
- // eslint-disable-next-line no-underscore-dangle
534
- expect ( responseStream . _readableState . ended ) . toEqual ( false )
535
- done ( )
536
- } , TIMEOUT )
535
+ test ( 'if storage node responds with ResendResponseResending, extend timeout' , ( ) => {
536
+ jest . advanceTimersByTime ( TIMEOUT - 1 )
537
+ nodeToNode . emit (
538
+ NodeToNode . events . RESEND_RESPONSE ,
539
+ new ResendResponseResending ( new StreamID ( 'streamId' , 0 ) , 'subId' , 'storageNode' )
540
+ )
541
+ jest . advanceTimersByTime ( TIMEOUT - 1 )
537
542
543
+ // eslint-disable-next-line no-underscore-dangle
544
+ expect ( responseStream . _readableState . ended ) . toEqual ( false )
545
+ } )
546
+
547
+ test ( 'if storage node responds with UnicastMessage, extend timeout' , ( ) => {
548
+ jest . advanceTimersByTime ( TIMEOUT - 1 )
538
549
nodeToNode . emit ( NodeToNode . events . UNICAST_RECEIVED , new UnicastMessage (
539
550
new MessageID ( new StreamID ( 'streamId' , 0 ) , 0 , 0 , '' , '' ) ,
540
551
null ,
@@ -544,11 +555,17 @@ describe('StorageNodeResendStrategy#getResendResponseStream', () => {
544
555
'subId' ,
545
556
'storageNode'
546
557
) )
558
+ jest . advanceTimersByTime ( TIMEOUT - 1 )
559
+
560
+ // eslint-disable-next-line no-underscore-dangle
561
+ expect ( responseStream . _readableState . ended ) . toEqual ( false )
547
562
} )
548
563
549
564
test ( 'if storage node responds with ResendResponseNoResend, returned stream is closed' , ( ) => {
550
- nodeToNode . emit ( NodeToNode . events . RESEND_RESPONSE ,
551
- new ResendResponseNoResend ( new StreamID ( 'streamId' , 0 ) , 'subId' , 'storageNode' ) )
565
+ nodeToNode . emit (
566
+ NodeToNode . events . RESEND_RESPONSE ,
567
+ new ResendResponseNoResend ( new StreamID ( 'streamId' , 0 ) , 'subId' , 'storageNode' )
568
+ )
552
569
// eslint-disable-next-line no-underscore-dangle
553
570
expect ( responseStream . _readableState . ended ) . toEqual ( true )
554
571
} )
@@ -572,9 +589,9 @@ describe('StorageNodeResendStrategy#getResendResponseStream', () => {
572
589
573
590
nodeToNode . emit ( NodeToNode . events . UNICAST_RECEIVED , u1 )
574
591
nodeToNode . emit ( NodeToNode . events . UNICAST_RECEIVED , u2 )
575
- await wait ( 10 )
592
+ jest . advanceTimersByTime ( TIMEOUT / 10 )
576
593
nodeToNode . emit ( NodeToNode . events . UNICAST_RECEIVED , u3 )
577
- await wait ( 10 )
594
+ jest . advanceTimersByTime ( TIMEOUT / 10 )
578
595
nodeToNode . emit ( NodeToNode . events . UNICAST_RECEIVED , u4 )
579
596
nodeToNode . emit ( NodeToNode . events . UNICAST_RECEIVED , u5 )
580
597
nodeToNode . emit ( NodeToNode . events . RESEND_RESPONSE ,
@@ -602,15 +619,15 @@ describe('StorageNodeResendStrategy#getResendResponseStream', () => {
602
619
TrackerNode . events . STORAGE_NODES_RECEIVED ,
603
620
new StorageNodesMessage ( new StreamID ( 'streamId' , 0 ) , [ 'ws://storageNode' ] )
604
621
)
605
- } )
606
622
607
- // Causes the stream to end. Other ways to end are a) failing to forward request and b) timeout. All of
608
- // them have same handling logic so testing only one case here.
609
- setImmediate ( ( ) => {
610
- nodeToNode . emit (
611
- NodeToNode . events . RESEND_RESPONSE ,
612
- new ResendResponseResending ( new StreamID ( 'streamId' , 0 ) , 'subId' , 'storageNode' )
613
- )
623
+ // Causes the stream to end. Other ways to end are a) failing to forward request and b) timeout. All of
624
+ // them have same handling logic so testing only one case here.
625
+ setImmediate ( ( ) => {
626
+ nodeToNode . emit (
627
+ NodeToNode . events . RESEND_RESPONSE ,
628
+ new ResendResponseResent ( new StreamID ( 'streamId' , 0 ) , 'subId' , 'storageNode' )
629
+ )
630
+ } )
614
631
} )
615
632
} )
616
633
0 commit comments