Skip to content

Commit 81cca48

Browse files
committed
Added a check sort app and fixed metadata bug in chunk reader
1 parent 3092b62 commit 81cca48

File tree

5 files changed

+124
-9
lines changed

5 files changed

+124
-9
lines changed

cpp/include/cmdline/data_sink.hpp

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,15 @@ struct SinkInfoDetails
1919
[[nodiscard]] std::unique_ptr<cudf::io::data_sink> make_data_sink(std::convertible_to<std::string> auto const &path,
2020
std::shared_ptr<Aws::S3::S3Client> &s3client);
2121

22-
cudf::io::chunked_parquet_writer_options_builder write_opts(cudf::io::sink_info const &sink) noexcept;
22+
cudf::io::chunked_parquet_writer_options_builder write_opts(cudf::io::sink_info const &sink,
23+
cudf::io::table_metadata const &metadata) noexcept;
2324

24-
[[no_discard]] SinkInfoDetails make_writer(std::string const &path, std::shared_ptr<Aws::S3::S3Client> &s3client);
25+
[[no_discard]] SinkInfoDetails make_writer(std::string const &path,
26+
cudf::io::table_metadata const &metadata,
27+
std::shared_ptr<Aws::S3::S3Client> &s3client);
2528

2629
[[no_discard]] SinkInfoDetails make_writer(std::convertible_to<std::string> auto const &path,
30+
cudf::io::table_metadata const &metadata,
2731
std::shared_ptr<Aws::S3::S3Client> &s3client) {
28-
return make_writer(path, s3client);
32+
return make_writer(path, metadata, s3client);
2933
}

cpp/src/cmdline/CMakeLists.txt

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,3 +37,18 @@ target_link_system_libraries(
3737
target_link_libraries(chunk_reader PRIVATE gpu_compact::io)
3838
target_include_directories(chunk_reader PRIVATE "${CMAKE_BINARY_DIR}/configured_files/include")
3939
target_include_directories(chunk_reader PRIVATE "${CMAKE_SOURCE_DIR}/include/cmdline")
40+
41+
add_executable(
42+
check_sort
43+
check_sort.cpp
44+
configure_logging.cpp
45+
../../include/cmdline/configure_logging.hpp)
46+
target_link_system_libraries(
47+
check_sort
48+
PRIVATE
49+
CLI11::CLI11
50+
fmt::fmt
51+
spdlog::spdlog
52+
cudf::cudf)
53+
target_include_directories(check_sort PRIVATE "${CMAKE_BINARY_DIR}/configured_files/include")
54+
target_include_directories(check_sort PRIVATE "${CMAKE_SOURCE_DIR}/include/cmdline")

cpp/src/cmdline/check_sort.cpp

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
#include "configure_logging.hpp"
2+
3+
#include <CLI/CLI.hpp>// NOLINT
4+
5+
#include <cudf/concatenate.hpp>
6+
#include <cudf/io/datasource.hpp>
7+
#include <cudf/io/parquet.hpp>
8+
#include <cudf/io/types.hpp>
9+
#include <cudf/sorting.hpp>
10+
#include <cudf/table/table.hpp>
11+
#include <cudf/table/table_view.hpp>
12+
#include <cudf/types.hpp>
13+
#include <cudf/utilities/error.hpp>
14+
#include <internal_use_only/config.hpp>
15+
#include <rmm/cuda_stream_view.hpp>
16+
#include <rmm/mr/device/owning_wrapper.hpp>
17+
#include <rmm/mr/device/pool_memory_resource.hpp>
18+
#ifdef SPDLOG_ACTIVE_LEVEL
19+
#undef SPDLOG_ACTIVE_LEVEL
20+
#endif
21+
#define SPDLOG_ACTIVE_LEVEL SPDLOG_LEVEL_DEBUG
22+
#include <spdlog/spdlog.h>
23+
24+
#include <cstddef>
25+
#include <memory>
26+
#include <string>
27+
28+
int main(int argc, char **argv) {
29+
configure_logging();
30+
// NOLINTNEXTLINE
31+
CLI::App app{ "Simple program to check if Parquet file is sorted with cuDF", "check_sort" };
32+
app.set_version_flag("--version", std::string{ gpu_compact::cmake::project_version });
33+
34+
std::string inputFile;
35+
app.add_option("input", inputFile, "Input Parquet file")->required();
36+
std::string colName;
37+
app.add_option("Column name", colName, "Column to validate sort order")->required();
38+
CLI11_PARSE(app, argc, argv);// NOLINT
39+
40+
// force gpu initialization so it's not included in the time
41+
rmm::cuda_stream_default.synchronize();
42+
43+
auto cuda_mr = std::make_shared<rmm::mr::cuda_memory_resource>();
44+
auto mr =
45+
rmm::mr::make_owning_wrapper<rmm::mr::pool_memory_resource>(cuda_mr, rmm::percent_of_free_device_memory(95));
46+
rmm::mr::set_current_device_resource(mr.get());
47+
48+
auto opts =
49+
cudf::io::parquet_reader_options::builder(cudf::io::source_info(inputFile)).columns({ colName }).build();
50+
cudf::io::chunked_parquet_reader reader{ 500 * 1'048'576l, 500 * 1'048'576l, opts };
51+
52+
SPDLOG_INFO("Validating sort on column '{}' in file '{}'", colName, inputFile);
53+
54+
// Loop doing reads
55+
// A sort problem may occur across a chunk boundary, so we use a slightly hacky workaround of keeping
56+
// the last chunk stored and then concatenate the current chunk on to it and then check that for sorting.
57+
::size_t totalRowsRead = 0;
58+
::size_t chunkNo = 0;
59+
cudf::io::table_with_metadata prevTable;
60+
while (reader.has_next()) {
61+
auto currentTable = reader.read_chunk();
62+
::size_t rowsRead = currentTable.tbl->num_rows();
63+
SPDLOG_INFO("Checking chunk number {:d} has {:d} rows", chunkNo, rowsRead);
64+
65+
std::unique_ptr<cudf::table> checkTable;
66+
cudf::table_view checkView = currentTable.tbl->view();
67+
if (prevTable.tbl) {
68+
checkTable = cudf::concatenate(std::vector<cudf::table_view>{ *prevTable.tbl, *currentTable.tbl });
69+
checkView = std::move(checkTable->view());
70+
}
71+
bool chunkSorted = cudf::is_sorted(checkView, { cudf::order::ASCENDING }, { cudf::null_order::AFTER });
72+
if (!chunkSorted) {
73+
SPDLOG_ERROR("Chunk number {:d} contains an incorrect sort order between rows [{:d},{:d})",
74+
chunkNo,
75+
totalRowsRead,
76+
totalRowsRead + rowsRead);
77+
CUDF_FAIL("Incorrect sort detected");
78+
}
79+
chunkNo++;
80+
totalRowsRead += rowsRead;
81+
prevTable = std::move(currentTable);
82+
}
83+
84+
SPDLOG_INFO("Finished, file is correctly sorted");
85+
}

cpp/src/cmdline/chunk_reader.cpp

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,14 +42,19 @@ ::size_t calcRowsWritten(auto const &readers) noexcept {
4242
});
4343
}
4444

45-
[[nodiscard]] inline std::chrono::time_point<std::chrono::steady_clock> timestamp() noexcept {
45+
[[nodiscard]] std::chrono::time_point<std::chrono::steady_clock> timestamp() noexcept {
4646
return std::chrono::steady_clock::now();
4747
}
4848

49+
[[nodiscard]] cudf::io::table_metadata grabMetaData(std::string const &file) {
50+
auto opts = cudf::io::parquet_reader_options::builder(cudf::io::source_info(file)).build();
51+
return cudf::io::read_parquet(opts).metadata;
52+
}
53+
4954
int main(int argc, char **argv) {
5055
configure_logging();
5156
// NOLINTNEXTLINE
52-
CLI::App app{ "Simple program to test spooling through Parquet files in chunks with cuDF", "chunk_reader" };
57+
CLI::App app{ "Simple program to test chunking compaction algorithm with cuDF", "chunk_reader" };
5358
app.set_version_flag("--version", std::string{ gpu_compact::cmake::project_version });
5459

5560
std::string outputFile;
@@ -98,8 +103,10 @@ int main(int argc, char **argv) {
98103
dynamic_cast<gpu_compact::io::PrefetchingSource *>(std::get<0>(readers.back()).get())->prefetch(true);
99104
}
100105

106+
// Grab metadata for schema from first file
107+
auto const tableMetadata = grabMetaData(inputFiles[0]);
101108
// Make writer
102-
SinkInfoDetails sinkDetails = make_writer(outputFile, s3client);
109+
SinkInfoDetails sinkDetails = make_writer(outputFile, tableMetadata, s3client);
103110
auto &writer = *sinkDetails.writer;
104111

105112
SPDLOG_INFO("Start reading files");

cpp/src/cmdline/data_sink.cpp

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,11 @@
1313
}
1414
}
1515

16-
cudf::io::chunked_parquet_writer_options_builder write_opts(cudf::io::sink_info const &sink) noexcept {
16+
cudf::io::chunked_parquet_writer_options_builder write_opts(cudf::io::sink_info const &sink,
17+
cudf::io::table_metadata const &metadata) noexcept {
1718
return cudf::io::chunked_parquet_writer_options::builder(sink)
1819
.compression(cudf::io::compression_type::ZSTD)
20+
.metadata(cudf::io::table_input_metadata{ metadata })
1921
.row_group_size_bytes(65 * 1'048'576)
2022
.row_group_size_rows(1'000'000)
2123
.max_page_size_bytes(512 * 1024)
@@ -25,9 +27,11 @@ cudf::io::chunked_parquet_writer_options_builder write_opts(cudf::io::sink_info
2527
.dictionary_policy(cudf::io::dictionary_policy::ADAPTIVE);
2628
}
2729

28-
[[no_discard]] SinkInfoDetails make_writer(std::string const &path, std::shared_ptr<Aws::S3::S3Client> &s3client) {
30+
[[no_discard]] SinkInfoDetails make_writer(std::string const &path,
31+
cudf::io::table_metadata const &metadata,
32+
std::shared_ptr<Aws::S3::S3Client> &s3client) {
2933
auto data_sink = make_data_sink(path, s3client);
3034
cudf::io::sink_info sink{ &*data_sink };
31-
auto wopts = write_opts(sink);
35+
auto wopts = write_opts(sink, metadata);
3236
return { sink, std::move(data_sink), std::make_unique<cudf::io::parquet_chunked_writer>(wopts.build()) };
3337
}

0 commit comments

Comments
 (0)