Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ VENV=.venv
VENV_BIN=$(VENV)/bin
NUM_PARTITIONS=10

# for data-table-partitioned
NUM_BATCHES?=1 ## data split into this number of batches, more batches reduce disk space required for temporary tbl files
PARALLELISM?=8 ## number of parallel data generation processes, can be 1, unless NUM_BATCHES is 1

.venv: ## Set up Python virtual environment and install dependencies
python3 -m venv $(VENV)
$(MAKE) install-deps
Expand Down Expand Up @@ -51,22 +55,22 @@ data/tables/scale-$(SCALE_FACTOR): .venv ## Generate data tables
# use tpch-cli
mkdir -p "data/tables/scale-$(SCALE_FACTOR)"
$(VENV_BIN)/tpchgen-cli --output-dir="data/tables/scale-$(SCALE_FACTOR)" --format=tbl -s $(SCALE_FACTOR)
$(VENV_BIN)/python -m scripts.prepare_data --num-parts=1 --tpch_gen_folder="data/tables/scale-$(SCALE_FACTOR)"
$(VENV_BIN)/python -m scripts.prepare_data --tpch_gen_folder="data/tables/scale-$(SCALE_FACTOR)"

# use tpch-dbgen
# $(MAKE) -C tpch-dbgen dbgen
# cd tpch-dbgen && ./dbgen -vf -s $(SCALE_FACTOR) && cd ..
# mkdir -p "data/tables/scale-$(SCALE_FACTOR)"
# mv tpch-dbgen/*.tbl data/tables/scale-$(SCALE_FACTOR)/
# $(VENV_BIN)/python -m scripts.prepare_data --num-parts=1 --tpch_gen_folder="data/tables/scale-$(SCALE_FACTOR)"
# $(VENV_BIN)/python -m scripts.prepare_data --tpch_gen_folder="data/tables/scale-$(SCALE_FACTOR)"
rm -rf data/tables/scale-$(SCALE_FACTOR)/*.tbl

.PHONY: data-tables-partitioned
data-tables-partitioned: data/tables/scale-$(SCALE_FACTOR)/${NUM_PARTITIONS}

data/tables/scale-$(SCALE_FACTOR)/${NUM_PARTITIONS}: .venv ## Generate partitioned data tables (these are not yet runnable with current repo)
$(MAKE) -C tpch-dbgen dbgen
$(VENV_BIN)/python -m scripts.prepare_data --num-parts=${NUM_PARTITIONS} --tpch_gen_folder="data/tables/scale-$(SCALE_FACTOR)"
$(VENV_BIN)/python -m scripts.prepare_data --num-batches=${NUM_BATCHES} --parallelism=${PARALLELISM} --tpch_gen_folder="data/tables/scale-$(SCALE_FACTOR)"


endif
Expand Down
10 changes: 9 additions & 1 deletion queries/common_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,15 @@
def get_table_path(table_name: str) -> Path:
"""Return the path to the given table."""
ext = settings.run.io_type if settings.run.include_io else "parquet"
return settings.dataset_base_dir / f"{table_name}.{ext}"
if settings.num_batches is None:
return settings.dataset_base_dir / f"{table_name}.{ext}"
return (
settings.dataset_base_dir
/ str(settings.num_batches)
/ table_name
/ "*"
/ f"part.{ext}"
)


def log_query_timing(
Expand Down
54 changes: 32 additions & 22 deletions scripts/prepare_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,26 +60,27 @@ def gen_csv(part_idx: int, cachedir: str, scale_factor: float, num_parts: int) -
def pipelined_data_generation(
scratch_dir: str,
scale_factor: float,
num_parts: int,
num_batches: int,
aws_s3_sync_location: str,
parallelism: int = 4,
rows_per_file: int = 500_000,
) -> None:
assert num_parts > 1, "script should only be used if num_parts > 1"

if aws_s3_sync_location.endswith("/"):
aws_s3_sync_location = aws_s3_sync_location[:-1]

base_path = pathlib.Path(scratch_dir) / str(num_parts)
base_path = pathlib.Path(scratch_dir) / str(num_batches)
base_path.mkdir(parents=True, exist_ok=True)

for i, part_indices in enumerate(batch(range(1, num_parts + 1), n=parallelism)):
num_dbgen_partitions = num_batches * parallelism
for batch_idx, part_indices in enumerate(
batch(range(1, num_dbgen_partitions + 1), n=parallelism)
):
logger.info("Partition %s: Generating CSV files", part_indices)
with Pool(parallelism) as process_pool:
process_pool.starmap(
gen_csv,
[
(part_idx, base_path, scale_factor, num_parts)
(part_idx, base_path, scale_factor, num_dbgen_partitions)
for part_idx in part_indices
],
)
Expand All @@ -88,20 +89,13 @@ def pipelined_data_generation(
for f in csv_files:
shutil.move(f, base_path / pathlib.Path(f).name)

gen_parquet(base_path, rows_per_file, partitioned=True, iteration_offset=i)
gen_parquet(base_path, rows_per_file, partitioned=True, batch_idx=batch_idx)
parquet_files = glob.glob(f"{base_path}/*.parquet") # noqa: PTH207

# Exclude static tables except for first iteration
exclude_static_tables = (
""
if i == 0
else " ".join([f'--exclude "*/{tbl}/*"' for tbl in STATIC_TABLES])
)

if len(aws_s3_sync_location):
subprocess.check_output(
shlex.split(
f'aws s3 sync {scratch_dir} {aws_s3_sync_location}/scale-factor-{scale_factor} --exclude "*" --include "*.parquet" {exclude_static_tables}'
f'aws s3 sync {scratch_dir} {aws_s3_sync_location}/scale-{scale_factor} --exclude "*" --include "*.parquet"'
)
)
for parquet_file in parquet_files:
Expand Down Expand Up @@ -197,9 +191,12 @@ def gen_parquet(
base_path: pathlib.Path,
rows_per_file: int = 500_000,
partitioned: bool = False,
iteration_offset: int = 0,
batch_idx: int = 0,
) -> None:
for table_name, columns in table_columns.items():
if table_name in STATIC_TABLES and batch_idx != 0:
continue

path = base_path / f"{table_name}.tbl*"

lf = pl.scan_csv(
Expand All @@ -214,9 +211,18 @@ def gen_parquet(
lf = lf.select(columns)

if partitioned:
(base_path / table_name).mkdir(parents=True, exist_ok=True)
path = base_path / table_name / f"{iteration_offset}_{{part}}.parquet"
lf.sink_parquet(pl.PartitionMaxSize(path, max_size=rows_per_file))

def partition_file_name(ctx: pl.BasePartitionContext) -> pathlib.Path:
partition = f"{batch_idx}_{ctx.file_idx}"
(base_path / table_name / partition).mkdir(parents=True, exist_ok=True) # noqa: B023
return pathlib.Path(partition) / "part.parquet"

path = base_path / table_name
lf.sink_parquet(
pl.PartitionMaxSize(
path, file_path=partition_file_name, max_size=rows_per_file
)
)
else:
path = base_path / f"{table_name}.parquet"
lf.sink_parquet(path)
Expand All @@ -242,7 +248,11 @@ def gen_parquet(
type=int,
)
parser.add_argument(
"--num-parts", default=32, help="Number of parts to generate", type=int
"--num-batches",
default=None,
help="Number of batches used to generate the data",
type=int,
nargs="?",
)
parser.add_argument(
"--aws-s3-sync-location",
Expand All @@ -257,7 +267,7 @@ def gen_parquet(
)
args = parser.parse_args()

if args.num_parts == 1:
if args.num_batches is None:
# Assumes the tables are already created by the Makefile
gen_parquet(
pathlib.Path(args.tpch_gen_folder),
Expand All @@ -268,7 +278,7 @@ def gen_parquet(
pipelined_data_generation(
args.tpch_gen_folder,
args.scale_factor,
args.num_parts,
args.num_batches,
args.aws_s3_sync_location,
parallelism=args.parallelism,
rows_per_file=args.rows_per_file,
Expand Down
1 change: 1 addition & 0 deletions settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ class Plot(BaseSettings):

class Settings(BaseSettings):
scale_factor: float = 1.0
num_batches: int | None = None

paths: Paths = Paths()
plot: Plot = Plot()
Expand Down