@@ -7,6 +7,7 @@ import DataWrapper from '../storage/data/DataWrapper';
7
7
import * as http from 'http' ;
8
8
import StatsClient from '../metrics/StatsClient' ;
9
9
import { objectKeyByteLimit } from '../constants' ;
10
+ const jsutil = require ( '../jsutil' ) ;
10
11
11
12
export type CallApiMethod = (
12
13
methodName : string ,
@@ -409,14 +410,15 @@ function retrieveData(
409
410
return eachSeries ( locations ,
410
411
( current , next ) => data . get ( current , response , log ,
411
412
( err : any , readable : http . IncomingMessage ) => {
413
+ const cbOnce = jsutil . once ( next ) ;
412
414
// NB: readable is of IncomingMessage type
413
415
if ( err ) {
414
416
log . error ( 'failed to get object' , {
415
417
error : err ,
416
418
method : 'retrieveData' ,
417
419
} ) ;
418
420
_destroyResponse ( ) ;
419
- return next ( err ) ;
421
+ return cbOnce ( err ) ;
420
422
}
421
423
// response.isclosed is set by the S3 server. Might happen if
422
424
// the S3-client closes the connection before the first request
@@ -430,19 +432,19 @@ function retrieveData(
430
432
// @ts -ignore
431
433
responseErr . code = 'ResponseError' ;
432
434
responseErr . message = 'response closed by client request before all data sent' ;
433
- return next ( responseErr ) ;
435
+ return cbOnce ( responseErr ) ;
434
436
}
435
437
// readable stream successfully consumed
436
438
readable . on ( 'end' , ( ) => {
437
439
currentStream = null ;
438
440
log . debug ( 'readable stream end reached' ) ;
439
- return next ( ) ;
441
+ return cbOnce ( ) ;
440
442
} ) ;
441
443
// errors on server side with readable stream
442
444
readable . on ( 'error' , err => {
443
445
log . error ( 'error piping data from source' ) ;
444
446
_destroyResponse ( ) ;
445
- return next ( err ) ;
447
+ return cbOnce ( err ) ;
446
448
} ) ;
447
449
currentStream = readable ;
448
450
return readable . pipe ( response , { end : false } ) ;
0 commit comments