@@ -24,14 +24,21 @@ class GetJob {
24
24
public async start ( ) {
25
25
mkdirp . sync ( this . directory ) ;
26
26
try {
27
- await axios ( {
27
+ // Get data
28
+ const response = await axios ( {
28
29
method : "get" ,
29
30
responseType : "stream" ,
30
31
url : this . url ,
31
- } ) . then ( ( response ) => {
32
+ } ) ;
33
+
34
+ // Write to file
35
+ await new Promise ( async ( resolve ) => {
36
+ const stream = fs . createWriteStream ( this . directory + "/" + this . name + "." + this . extension ) ;
32
37
// noinspection TypeScriptValidateJSTypes
33
- response . data . pipe ( fs . createWriteStream ( this . directory + "/" + this . name + "." + this . extension ) ) ;
38
+ response . data . pipe ( stream ) ;
39
+ stream . on ( "finish" , resolve ) ;
34
40
} ) ;
41
+
35
42
} catch ( e ) {
36
43
this . logger . info ( `Downloading ${ this . url } failed` ) ;
37
44
this . logger . debug ( e ) ;
@@ -44,6 +51,10 @@ class GetJob {
44
51
* A pool of jobs that only executes k jobs 'simultaneously'
45
52
*/
46
53
export class GetPool {
54
+
55
+ // Job promises
56
+ public promises : Array < Promise < void > > = [ ] ;
57
+
47
58
// Jobs that are currently being executed
48
59
private runningJobs : GetJob [ ] = [ ] ;
49
60
@@ -62,6 +73,9 @@ export class GetPool {
62
73
// End-of-input signal triggered externally by close()
63
74
private finished : boolean = false ;
64
75
76
+ // End-of-input resolve function
77
+ private resolve : ( ) => { } ;
78
+
65
79
constructor ( connections : number = 1 ) {
66
80
this . maxConnections = connections ;
67
81
this . loop = setInterval ( ( ) => {
@@ -74,8 +88,9 @@ export class GetPool {
74
88
this . queuedJobs . push ( new GetJob ( url , name , extension , directory , logger ) ) ;
75
89
}
76
90
77
- public close ( ) {
91
+ public close ( resolve ) {
78
92
this . finished = true ;
93
+ this . resolve = resolve ;
79
94
}
80
95
81
96
private poolLoop ( ) {
@@ -97,13 +112,14 @@ export class GetPool {
97
112
// Add new jobs to empty running slots
98
113
while ( this . queuedJobs . length > 0 && this . runningJobs . length < this . maxConnections ) {
99
114
const job = this . queuedJobs . shift ( ) ;
100
- job . start ( ) ;
115
+ this . promises . push ( job . start ( ) ) ;
101
116
this . runningJobs . push ( job ) ;
102
117
}
103
118
104
119
// End the interval when end-of-input signal given
105
120
if ( this . finished && this . queuedJobs . length === 0 && this . runningJobs . length === 0 ) {
106
121
clearInterval ( this . loop ) ;
122
+ this . resolve ( ) ;
107
123
}
108
124
109
125
// Release lock
0 commit comments