@@ -2,27 +2,64 @@ use flate2::read::GzDecoder;
2
2
use indicatif:: { MultiProgress , ProgressBar , ProgressStyle } ;
3
3
use reqwest:: { header, Client , Url } ;
4
4
use reqwest_middleware:: { ClientBuilder , ClientWithMiddleware } ;
5
- use reqwest_retry:: policies:: ExponentialBackoff ;
6
- use reqwest_retry :: Jitter ;
7
- use reqwest_retry :: RetryTransientMiddleware ;
8
- use std :: fs :: File ;
9
- use std :: io :: BufRead ;
10
- use std :: io :: BufReader ;
11
- use std :: path :: { Path , PathBuf } ;
12
- use std :: process ;
13
- use std :: sync :: Arc ;
14
- use std :: time :: Duration ;
15
- use tokio :: io:: AsyncWriteExt ;
16
- use tokio :: io :: BufWriter ;
17
- use tokio :: sync :: Semaphore ;
18
- use tokio :: task :: JoinSet ;
5
+ use reqwest_retry:: { policies:: ExponentialBackoff , Jitter , RetryTransientMiddleware } ;
6
+ use std :: {
7
+ fs :: File ,
8
+ io :: { BufRead , BufReader } ,
9
+ path :: { Path , PathBuf } ,
10
+ process , str ,
11
+ sync :: Arc ,
12
+ time :: Duration ,
13
+ } ;
14
+ use tokio :: {
15
+ io:: { AsyncWriteExt , BufWriter } ,
16
+ sync :: Semaphore ,
17
+ task :: JoinSet ,
18
+ } ;
19
19
20
20
use crate :: errors:: DownloadError ;
21
21
22
22
const BASE_URL : & str = "https://data.commoncrawl.org/" ;
23
23
24
24
static APP_USER_AGENT : & str = concat ! ( env!( "CARGO_PKG_NAME" ) , "/" , env!( "CARGO_PKG_VERSION" ) , ) ;
25
25
26
+ pub struct DownloadOptions < ' a > {
27
+ pub snapshot : String ,
28
+ pub data_type : & ' a str ,
29
+ pub paths : & ' a Path ,
30
+ pub dst : & ' a Path ,
31
+ pub threads : usize ,
32
+ pub max_retries : usize ,
33
+ pub numbered : bool ,
34
+ pub files_only : bool ,
35
+ pub progress : bool ,
36
+ }
37
+
38
+ pub struct TaskOptions {
39
+ pub number : usize ,
40
+ pub path : String ,
41
+ pub dst : PathBuf ,
42
+ pub numbered : bool ,
43
+ pub files_only : bool ,
44
+ pub progress : bool ,
45
+ }
46
+
47
+ impl Default for DownloadOptions < ' _ > {
48
+ fn default ( ) -> Self {
49
+ DownloadOptions {
50
+ snapshot : "" . to_string ( ) ,
51
+ data_type : "" ,
52
+ paths : Path :: new ( "" ) ,
53
+ dst : Path :: new ( "" ) ,
54
+ threads : 1 ,
55
+ max_retries : 1000 ,
56
+ numbered : false ,
57
+ files_only : false ,
58
+ progress : false ,
59
+ }
60
+ }
61
+ }
62
+
26
63
fn new_client ( max_retries : usize ) -> Result < ClientWithMiddleware , DownloadError > {
27
64
let retry_policy = ExponentialBackoff :: builder ( )
28
65
. retry_bounds ( Duration :: from_secs ( 1 ) , Duration :: from_secs ( 3600 ) )
@@ -37,16 +74,15 @@ fn new_client(max_retries: usize) -> Result<ClientWithMiddleware, DownloadError>
37
74
. build ( ) )
38
75
}
39
76
40
- pub async fn download_paths (
41
- snapshot : & String ,
42
- data_type : & str ,
43
- dst : & Path ,
44
- ) -> Result < ( ) , DownloadError > {
45
- let paths = format ! ( "{}crawl-data/{}/{}.paths.gz" , BASE_URL , snapshot, data_type) ;
77
+ pub async fn download_paths ( options : DownloadOptions < ' _ > ) -> Result < ( ) , DownloadError > {
78
+ let paths = format ! (
79
+ "{}crawl-data/{}/{}.paths.gz" ,
80
+ BASE_URL , options. snapshot, options. data_type
81
+ ) ;
46
82
println ! ( "Downloading paths from: {}" , paths) ;
47
83
let url = Url :: parse ( & paths) ?;
48
84
49
- let client = new_client ( 1000 ) ?;
85
+ let client = new_client ( options . max_retries ) ?;
50
86
51
87
let filename = url
52
88
. path_segments ( ) // Splits into segments of the URL
@@ -55,7 +91,7 @@ pub async fn download_paths(
55
91
56
92
let request = client. get ( url. as_str ( ) ) ;
57
93
58
- let mut dst = dst. to_path_buf ( ) ;
94
+ let mut dst = options . dst . to_path_buf ( ) ;
59
95
60
96
dst. push ( filename) ;
61
97
@@ -79,16 +115,11 @@ pub async fn download_paths(
79
115
80
116
async fn download_task (
81
117
client : ClientWithMiddleware ,
82
- download_url : String ,
83
- number : usize ,
84
118
multibar : Arc < MultiProgress > ,
85
- dst : PathBuf ,
86
- numbered : bool ,
87
- files_only : bool ,
88
- progress : bool ,
119
+ task_options : TaskOptions ,
89
120
) -> Result < ( ) , DownloadError > {
90
121
// Parse URL into Url type
91
- let url = Url :: parse ( & download_url ) ?;
122
+ let url = Url :: parse ( & task_options . path ) ?;
92
123
93
124
// We need to determine the file size before we download, so we can create a ProgressBar
94
125
// A Header request for the CONTENT_LENGTH header gets us the file size
@@ -109,17 +140,17 @@ async fn download_task(
109
140
} ;
110
141
111
142
// Parse the filename from the given URL
112
- let filename = if numbered {
113
- & format ! ( "{}{}" , number, ".txt.gz" )
114
- } else if files_only {
143
+ let filename = if task_options . numbered {
144
+ & format ! ( "{}{}" , task_options . number, ".txt.gz" )
145
+ } else if task_options . files_only {
115
146
url. path_segments ( )
116
147
. and_then ( |segments| segments. last ( ) )
117
148
. unwrap_or ( "file.download" )
118
149
} else {
119
150
url. path ( ) . strip_prefix ( "/" ) . unwrap_or ( "file.download" )
120
151
} ;
121
152
122
- let mut dst = dst. clone ( ) ;
153
+ let mut dst = task_options . dst . clone ( ) ;
123
154
124
155
dst. push ( filename) ;
125
156
@@ -130,7 +161,7 @@ async fn download_task(
130
161
// and add it to the multibar
131
162
let progress_bar = multibar. add ( ProgressBar :: new ( download_size) ) ;
132
163
133
- if progress {
164
+ if task_options . progress {
134
165
// Set Style to the ProgressBar
135
166
progress_bar. set_style (
136
167
ProgressStyle :: default_bar ( )
@@ -145,7 +176,7 @@ async fn download_task(
145
176
}
146
177
147
178
// Create the directory if it doesn't exist
148
- if !numbered {
179
+ if !task_options . numbered {
149
180
if let Some ( parent) = dst. parent ( ) {
150
181
tokio:: fs:: create_dir_all ( parent) . await ?;
151
182
}
@@ -163,13 +194,13 @@ async fn download_task(
163
194
// We use the part from the reqwest-tokio example here on purpose
164
195
// This way, we are able to increase the ProgressBar with every downloaded chunk
165
196
while let Some ( chunk) = download. chunk ( ) . await ? {
166
- if progress {
197
+ if task_options . progress {
167
198
progress_bar. inc ( chunk. len ( ) as u64 ) ; // Increase ProgressBar by chunk size
168
199
}
169
200
outfile. write_all ( & chunk) . await ?; // Write chunk to output file
170
201
}
171
202
172
- if progress {
203
+ if task_options . progress {
173
204
// Finish the progress bar to prevent glitches
174
205
progress_bar. finish ( ) ;
175
206
@@ -187,22 +218,18 @@ async fn download_task(
187
218
Ok ( ( ) )
188
219
}
189
220
190
- pub async fn download (
191
- paths : & Path ,
192
- dst : & Path ,
193
- threads : usize ,
194
- max_retries : usize ,
195
- numbered : bool ,
196
- files_only : bool ,
197
- progress : bool ,
198
- ) -> Result < ( ) , DownloadError > {
221
+ pub async fn download ( options : DownloadOptions < ' _ > ) -> Result < ( ) , DownloadError > {
199
222
// A vector containing all the URLs to download
200
223
201
224
let file = {
202
- let gzip_file = match File :: open ( paths) {
225
+ let gzip_file = match File :: open ( options . paths ) {
203
226
Ok ( file) => file,
204
227
Err ( e) => {
205
- eprintln ! ( "Could not open file {}\n Error: {}" , paths. display( ) , e) ;
228
+ eprintln ! (
229
+ "Could not open file {}\n Error: {}" ,
230
+ options. paths. display( ) ,
231
+ e
232
+ ) ;
206
233
process:: exit ( 1 )
207
234
}
208
235
} ;
@@ -232,7 +259,7 @@ pub async fn download(
232
259
) ;
233
260
234
261
// Only set the style if we are showing progress
235
- if progress {
262
+ if options . progress {
236
263
main_pb. set_style (
237
264
indicatif:: ProgressStyle :: default_bar ( ) . template ( "{msg} {bar:10} {pos}/{len}" ) ?,
238
265
) ;
@@ -243,25 +270,30 @@ pub async fn download(
243
270
main_pb. tick ( ) ;
244
271
}
245
272
246
- let client = new_client ( max_retries) ?;
273
+ let client = new_client ( options . max_retries ) ?;
247
274
248
- let semaphore = Arc :: new ( Semaphore :: new ( threads) ) ;
275
+ let semaphore = Arc :: new ( Semaphore :: new ( options . threads ) ) ;
249
276
let mut set = JoinSet :: new ( ) ;
250
277
251
278
for ( number, path) in paths {
252
279
// Clone multibar and main_pb. We will move the clones into each task.
253
280
let multibar = multibar. clone ( ) ;
254
281
let main_pb = main_pb. clone ( ) ;
255
282
let client = client. clone ( ) ;
256
- let dst = dst. to_path_buf ( ) ;
283
+ let dst = options . dst . to_path_buf ( ) ;
257
284
let semaphore = semaphore. clone ( ) ;
258
285
set. spawn ( async move {
259
286
let _permit = semaphore. acquire ( ) . await ;
260
- let res = download_task (
261
- client, path, number, multibar, dst, numbered, files_only, progress,
262
- )
263
- . await ;
264
- if progress {
287
+ let task_options = TaskOptions {
288
+ path,
289
+ number,
290
+ dst,
291
+ numbered : options. numbered ,
292
+ files_only : options. files_only ,
293
+ progress : options. progress ,
294
+ } ;
295
+ let res = download_task ( client, multibar, task_options) . await ;
296
+ if options. progress {
265
297
// Increment the main progress bar.
266
298
main_pb. inc ( 1 ) ;
267
299
}
@@ -289,7 +321,7 @@ pub async fn download(
289
321
}
290
322
}
291
323
292
- if progress {
324
+ if options . progress {
293
325
// Change the message on the overall progress indicator.
294
326
main_pb. finish_with_message ( "done" ) ;
295
327
0 commit comments