@@ -72,7 +72,9 @@ def pipelined_data_generation(
72
72
base_path .mkdir (parents = True , exist_ok = True )
73
73
74
74
num_dbgen_partitions = num_batches * parallelism
75
- for batch_idx , part_indices in enumerate (batch (range (1 , num_dbgen_partitions + 1 ), n = parallelism )):
75
+ for batch_idx , part_indices in enumerate (
76
+ batch (range (1 , num_dbgen_partitions + 1 ), n = parallelism )
77
+ ):
76
78
logger .info ("Partition %s: Generating CSV files" , part_indices )
77
79
with Pool (parallelism ) as process_pool :
78
80
process_pool .starmap (
@@ -209,13 +211,18 @@ def gen_parquet(
209
211
lf = lf .select (columns )
210
212
211
213
if partitioned :
214
+
212
215
def partition_file_name (ctx ):
213
216
partition = f"{ batch_idx } _{ ctx .file_idx } "
214
217
(base_path / table_name / partition ).mkdir (parents = True , exist_ok = True ) # noqa: B023
215
218
return pathlib .Path (partition ) / "part.parquet"
216
219
217
220
path = base_path / table_name
218
- lf .sink_parquet (pl .PartitionMaxSize (path , file_path = partition_file_name , max_size = rows_per_file ))
221
+ lf .sink_parquet (
222
+ pl .PartitionMaxSize (
223
+ path , file_path = partition_file_name , max_size = rows_per_file
224
+ )
225
+ )
219
226
else :
220
227
path = base_path / f"{ table_name } .parquet"
221
228
lf .sink_parquet (path )
@@ -241,7 +248,11 @@ def partition_file_name(ctx):
241
248
type = int ,
242
249
)
243
250
parser .add_argument (
244
- "--num-batches" , default = None , help = "Number of batches used to generate the data" , type = int , nargs = "?"
251
+ "--num-batches" ,
252
+ default = None ,
253
+ help = "Number of batches used to generate the data" ,
254
+ type = int ,
255
+ nargs = "?"
245
256
)
246
257
parser .add_argument (
247
258
"--aws-s3-sync-location" ,
0 commit comments