Skip to content

Commit c3e288e

Browse files
authored
Rework partitioned data generation (#168)
1 parent 70b724f commit c3e288e

File tree

4 files changed

+49
-26
lines changed

4 files changed

+49
-26
lines changed

Makefile

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,10 @@ VENV=.venv
66
VENV_BIN=$(VENV)/bin
77
NUM_PARTITIONS=10
88

9+
# for data-table-partitioned
10+
NUM_BATCHES?=1 ## data split into this number of batches, more batches reduce disk space required for temporary tbl files
11+
PARALLELISM?=8 ## number of parallel data generation processes, can be 1, unless NUM_BATCHES is 1
12+
913
.venv: ## Set up Python virtual environment and install dependencies
1014
python3 -m venv $(VENV)
1115
$(MAKE) install-deps
@@ -51,22 +55,22 @@ data/tables/scale-$(SCALE_FACTOR): .venv ## Generate data tables
5155
# use tpch-cli
5256
mkdir -p "data/tables/scale-$(SCALE_FACTOR)"
5357
$(VENV_BIN)/tpchgen-cli --output-dir="data/tables/scale-$(SCALE_FACTOR)" --format=tbl -s $(SCALE_FACTOR)
54-
$(VENV_BIN)/python -m scripts.prepare_data --num-parts=1 --tpch_gen_folder="data/tables/scale-$(SCALE_FACTOR)"
58+
$(VENV_BIN)/python -m scripts.prepare_data --tpch_gen_folder="data/tables/scale-$(SCALE_FACTOR)"
5559

5660
# use tpch-dbgen
5761
# $(MAKE) -C tpch-dbgen dbgen
5862
# cd tpch-dbgen && ./dbgen -vf -s $(SCALE_FACTOR) && cd ..
5963
# mkdir -p "data/tables/scale-$(SCALE_FACTOR)"
6064
# mv tpch-dbgen/*.tbl data/tables/scale-$(SCALE_FACTOR)/
61-
# $(VENV_BIN)/python -m scripts.prepare_data --num-parts=1 --tpch_gen_folder="data/tables/scale-$(SCALE_FACTOR)"
65+
# $(VENV_BIN)/python -m scripts.prepare_data --tpch_gen_folder="data/tables/scale-$(SCALE_FACTOR)"
6266
rm -rf data/tables/scale-$(SCALE_FACTOR)/*.tbl
6367

6468
.PHONY: data-tables-partitioned
6569
data-tables-partitioned: data/tables/scale-$(SCALE_FACTOR)/${NUM_PARTITIONS}
6670

6771
data/tables/scale-$(SCALE_FACTOR)/${NUM_PARTITIONS}: .venv ## Generate partitioned data tables (these are not yet runnable with current repo)
6872
$(MAKE) -C tpch-dbgen dbgen
69-
$(VENV_BIN)/python -m scripts.prepare_data --num-parts=${NUM_PARTITIONS} --tpch_gen_folder="data/tables/scale-$(SCALE_FACTOR)"
73+
$(VENV_BIN)/python -m scripts.prepare_data --num-batches=${NUM_BATCHES} --parallelism=${PARALLELISM} --tpch_gen_folder="data/tables/scale-$(SCALE_FACTOR)"
7074

7175

7276
endif

queries/common_utils.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,15 @@
2323
def get_table_path(table_name: str) -> Path:
2424
"""Return the path to the given table."""
2525
ext = settings.run.io_type if settings.run.include_io else "parquet"
26-
return settings.dataset_base_dir / f"{table_name}.{ext}"
26+
if settings.num_batches is None:
27+
return settings.dataset_base_dir / f"{table_name}.{ext}"
28+
return (
29+
settings.dataset_base_dir
30+
/ str(settings.num_batches)
31+
/ table_name
32+
/ "*"
33+
/ f"part.{ext}"
34+
)
2735

2836

2937
def log_query_timing(

scripts/prepare_data.py

Lines changed: 32 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -60,26 +60,27 @@ def gen_csv(part_idx: int, cachedir: str, scale_factor: float, num_parts: int) -
6060
def pipelined_data_generation(
6161
scratch_dir: str,
6262
scale_factor: float,
63-
num_parts: int,
63+
num_batches: int,
6464
aws_s3_sync_location: str,
6565
parallelism: int = 4,
6666
rows_per_file: int = 500_000,
6767
) -> None:
68-
assert num_parts > 1, "script should only be used if num_parts > 1"
69-
7068
if aws_s3_sync_location.endswith("/"):
7169
aws_s3_sync_location = aws_s3_sync_location[:-1]
7270

73-
base_path = pathlib.Path(scratch_dir) / str(num_parts)
71+
base_path = pathlib.Path(scratch_dir) / str(num_batches)
7472
base_path.mkdir(parents=True, exist_ok=True)
7573

76-
for i, part_indices in enumerate(batch(range(1, num_parts + 1), n=parallelism)):
74+
num_dbgen_partitions = num_batches * parallelism
75+
for batch_idx, part_indices in enumerate(
76+
batch(range(1, num_dbgen_partitions + 1), n=parallelism)
77+
):
7778
logger.info("Partition %s: Generating CSV files", part_indices)
7879
with Pool(parallelism) as process_pool:
7980
process_pool.starmap(
8081
gen_csv,
8182
[
82-
(part_idx, base_path, scale_factor, num_parts)
83+
(part_idx, base_path, scale_factor, num_dbgen_partitions)
8384
for part_idx in part_indices
8485
],
8586
)
@@ -88,20 +89,13 @@ def pipelined_data_generation(
8889
for f in csv_files:
8990
shutil.move(f, base_path / pathlib.Path(f).name)
9091

91-
gen_parquet(base_path, rows_per_file, partitioned=True, iteration_offset=i)
92+
gen_parquet(base_path, rows_per_file, partitioned=True, batch_idx=batch_idx)
9293
parquet_files = glob.glob(f"{base_path}/*.parquet") # noqa: PTH207
9394

94-
# Exclude static tables except for first iteration
95-
exclude_static_tables = (
96-
""
97-
if i == 0
98-
else " ".join([f'--exclude "*/{tbl}/*"' for tbl in STATIC_TABLES])
99-
)
100-
10195
if len(aws_s3_sync_location):
10296
subprocess.check_output(
10397
shlex.split(
104-
f'aws s3 sync {scratch_dir} {aws_s3_sync_location}/scale-factor-{scale_factor} --exclude "*" --include "*.parquet" {exclude_static_tables}'
98+
f'aws s3 sync {scratch_dir} {aws_s3_sync_location}/scale-{scale_factor} --exclude "*" --include "*.parquet"'
10599
)
106100
)
107101
for parquet_file in parquet_files:
@@ -197,9 +191,12 @@ def gen_parquet(
197191
base_path: pathlib.Path,
198192
rows_per_file: int = 500_000,
199193
partitioned: bool = False,
200-
iteration_offset: int = 0,
194+
batch_idx: int = 0,
201195
) -> None:
202196
for table_name, columns in table_columns.items():
197+
if table_name in STATIC_TABLES and batch_idx != 0:
198+
continue
199+
203200
path = base_path / f"{table_name}.tbl*"
204201

205202
lf = pl.scan_csv(
@@ -214,9 +211,18 @@ def gen_parquet(
214211
lf = lf.select(columns)
215212

216213
if partitioned:
217-
(base_path / table_name).mkdir(parents=True, exist_ok=True)
218-
path = base_path / table_name / f"{iteration_offset}_{{part}}.parquet"
219-
lf.sink_parquet(pl.PartitionMaxSize(path, max_size=rows_per_file))
214+
215+
def partition_file_name(ctx: pl.BasePartitionContext) -> pathlib.Path:
216+
partition = f"{batch_idx}_{ctx.file_idx}"
217+
(base_path / table_name / partition).mkdir(parents=True, exist_ok=True) # noqa: B023
218+
return pathlib.Path(partition) / "part.parquet"
219+
220+
path = base_path / table_name
221+
lf.sink_parquet(
222+
pl.PartitionMaxSize(
223+
path, file_path=partition_file_name, max_size=rows_per_file
224+
)
225+
)
220226
else:
221227
path = base_path / f"{table_name}.parquet"
222228
lf.sink_parquet(path)
@@ -242,7 +248,11 @@ def gen_parquet(
242248
type=int,
243249
)
244250
parser.add_argument(
245-
"--num-parts", default=32, help="Number of parts to generate", type=int
251+
"--num-batches",
252+
default=None,
253+
help="Number of batches used to generate the data",
254+
type=int,
255+
nargs="?",
246256
)
247257
parser.add_argument(
248258
"--aws-s3-sync-location",
@@ -257,7 +267,7 @@ def gen_parquet(
257267
)
258268
args = parser.parse_args()
259269

260-
if args.num_parts == 1:
270+
if args.num_batches is None:
261271
# Assumes the tables are already created by the Makefile
262272
gen_parquet(
263273
pathlib.Path(args.tpch_gen_folder),
@@ -268,7 +278,7 @@ def gen_parquet(
268278
pipelined_data_generation(
269279
args.tpch_gen_folder,
270280
args.scale_factor,
271-
args.num_parts,
281+
args.num_batches,
272282
args.aws_s3_sync_location,
273283
parallelism=args.parallelism,
274284
rows_per_file=args.rows_per_file,

settings.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ class Plot(BaseSettings):
7777

7878
class Settings(BaseSettings):
7979
scale_factor: float = 1.0
80+
num_batches: int | None = None
8081

8182
paths: Paths = Paths()
8283
plot: Plot = Plot()

0 commit comments

Comments
 (0)