Skip to content

Commit 56f7d78

Browse files
committed
GPU summing aggregator test
1 parent 091461f commit 56f7d78

File tree

2 files changed

+294
-0
lines changed

2 files changed

+294
-0
lines changed

cpp/src/cmdline/CMakeLists.txt

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,28 @@ target_link_libraries(chunk_reader PRIVATE gpu_compact::io)
3838
target_include_directories(chunk_reader PRIVATE "${CMAKE_BINARY_DIR}/configured_files/include")
3939
target_include_directories(chunk_reader PRIVATE "${CMAKE_SOURCE_DIR}/include/cmdline")
4040

41+
add_executable(
42+
sum_chunk_reader
43+
sum_chunk_reader.cpp
44+
configure_logging.cpp
45+
../../include/cmdline/configure_logging.hpp
46+
data_sink.cpp
47+
../../include/cmdline/data_sink.hpp
48+
lub.cpp
49+
../../include/cmdline/lub.hpp
50+
slice.cpp
51+
../../include/cmdline/slice.hpp)
52+
target_link_system_libraries(
53+
sum_chunk_reader
54+
PRIVATE
55+
CLI11::CLI11
56+
fmt::fmt
57+
spdlog::spdlog
58+
cudf::cudf)
59+
target_link_libraries(sum_chunk_reader PRIVATE gpu_compact::io)
60+
target_include_directories(sum_chunk_reader PRIVATE "${CMAKE_BINARY_DIR}/configured_files/include")
61+
target_include_directories(sum_chunk_reader PRIVATE "${CMAKE_SOURCE_DIR}/include/cmdline")
62+
4163
add_executable(
4264
check_sort
4365
check_sort.cpp

cpp/src/cmdline/sum_chunk_reader.cpp

Lines changed: 272 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,272 @@
1+
#include "configure_logging.hpp"
2+
3+
#include <CLI/CLI.hpp>// NOLINT
4+
5+
#include <cudf/aggregation.hpp>
6+
#include <cudf/concatenate.hpp>
7+
#include <cudf/copying.hpp>
8+
#include <cudf/groupby.hpp>
9+
#include <cudf/io/datasource.hpp>
10+
#include <cudf/io/parquet.hpp>
11+
#include <cudf/io/types.hpp>
12+
#include <cudf/merge.hpp>
13+
#include <cudf/table/table.hpp>
14+
#include <cudf/table/table_view.hpp>
15+
#include <cudf/types.hpp>
16+
#include <cudf/utilities/error.hpp>
17+
#include <internal_use_only/config.hpp>
18+
#include <rmm/cuda_stream_view.hpp>
19+
#include <rmm/mr/device/owning_wrapper.hpp>
20+
#include <rmm/mr/device/pool_memory_resource.hpp>
21+
#ifdef SPDLOG_ACTIVE_LEVEL
22+
#undef SPDLOG_ACTIVE_LEVEL
23+
#endif
24+
#define SPDLOG_ACTIVE_LEVEL SPDLOG_LEVEL_DEBUG
25+
#include <spdlog/spdlog.h>
26+
27+
#include "data_sink.hpp"
28+
#include "io/prefetch_source.hpp"
29+
#include "io/s3_utils.hpp"
30+
#include "lub.hpp"
31+
#include "slice.hpp"
32+
33+
#include <algorithm>
34+
#include <chrono>
35+
#include <cstddef>
36+
#include <iterator>
37+
#include <limits>
38+
#include <memory>
39+
#include <string>
40+
#include <tuple>
41+
#include <utility>
42+
#include <vector>
43+
44+
std::size_t calcRowsRead(auto const &readers) noexcept {
45+
return std::accumulate(
46+
readers.cbegin(), readers.cend(), ::size_t{ 0 }, [](auto &&acc, auto const &item) constexpr noexcept {
47+
return acc + std::get<3>(item);
48+
});
49+
}
50+
51+
std::size_t calcRowsInViews(auto const &views) noexcept {
52+
return std::accumulate(
53+
views.cbegin(), views.cend(), ::size_t{ 0 }, [](auto &&acc, auto const &item) constexpr noexcept {
54+
return acc + item.num_rows();
55+
});
56+
}
57+
58+
[[nodiscard]] std::chrono::time_point<std::chrono::steady_clock> timestamp() noexcept {
59+
return std::chrono::steady_clock::now();
60+
}
61+
62+
[[nodiscard]] cudf::io::table_metadata grabMetaData(std::string const &file) {
63+
auto opts = cudf::io::parquet_reader_options::builder(cudf::io::source_info(file))
64+
.columns({ "key", "timestamp" })
65+
.num_rows(1)
66+
.build();
67+
return cudf::io::read_parquet(opts).metadata;
68+
}
69+
70+
int main(int argc, char **argv) {
71+
configure_logging();
72+
// NOLINTNEXTLINE
73+
CLI::App app{ "Simple program based chunking compaction algorithm with cuDF", "chunk_reader" };
74+
app.set_version_flag("--version", std::string{ gpu_compact::cmake::project_version });
75+
76+
std::string outputFile;
77+
app.add_option("output", outputFile, "Output path for Parquet file")->required();
78+
std::vector<std::string> inputFiles;
79+
app.add_option("input", inputFiles, "Input Parquet files")->required()->expected(1, -1);
80+
std::size_t chunkReadLimit{ 1024 };
81+
app.add_option("-c,--chunk-read-limit", chunkReadLimit, "cuDF Parquet reader chunk read limit in MiB");
82+
std::size_t passReadLimit{ 1024 };
83+
app.add_option("-p,--pass-read-limit", passReadLimit, "cuDF Parquet reader pass read limit in MiB");
84+
std::size_t epsilon{ 20'000 };
85+
app.add_option("-e,--epsilon", epsilon, "Lower bound for rows remaining in a table before loading next chunk");
86+
bool alwaysMergeAll{ false };
87+
app.add_flag("-a,--always-merge-all",
88+
alwaysMergeAll,
89+
"Use broken algorithm that doesn't sort properly. Useful for testing. DO NOT USE!");
90+
91+
CLI11_PARSE(app, argc, argv);// NOLINT
92+
93+
// force gpu initialization so it's not included in the time
94+
rmm::cuda_stream_default.synchronize();
95+
gpu_compact::s3::initialiseAWS();
96+
{
97+
auto s3client = gpu_compact::s3::makeClient();
98+
99+
auto cuda_mr = std::make_shared<rmm::mr::cuda_memory_resource>();
100+
auto mr =
101+
rmm::mr::make_owning_wrapper<rmm::mr::pool_memory_resource>(cuda_mr, rmm::percent_of_free_device_memory(95));
102+
rmm::mr::set_current_device_resource(mr.get());
103+
104+
// Container for all data sources and Parquet readers
105+
std::vector<std::tuple<std::unique_ptr<cudf::io::datasource>,
106+
std::unique_ptr<cudf::io::chunked_parquet_reader>,
107+
std::size_t,
108+
std::size_t>>
109+
readers;
110+
readers.reserve(inputFiles.size());
111+
112+
// Make readers and find total row count
113+
// We create the data source and disable prefetching while we read the footer
114+
std::size_t totalRows = 0;
115+
for (auto const &f : inputFiles) {
116+
auto source =
117+
std::make_unique<gpu_compact::io::PrefetchingSource>(f, cudf::io::datasource::create(f), false);
118+
totalRows += cudf::io::read_parquet_metadata(cudf::io::source_info(source.get())).num_rows();
119+
auto reader_builder = cudf::io::parquet_reader_options::builder(cudf::io::source_info(&*source))
120+
.columns({ "key", "timestamp" });
121+
readers.emplace_back(std::move(source),
122+
std::make_unique<cudf::io::chunked_parquet_reader>(
123+
chunkReadLimit * 1'048'576, passReadLimit * 1'048'576, reader_builder.build()),
124+
0,
125+
0);
126+
// Enable pre-fetching after footer read
127+
dynamic_cast<gpu_compact::io::PrefetchingSource *>(std::get<0>(readers.back()).get())->prefetch(true);
128+
}
129+
130+
// Grab metadata for schema from first file
131+
auto const tableMetadata = grabMetaData(inputFiles[0]);
132+
// Make writer
133+
SinkInfoDetails sinkDetails = make_writer(outputFile, tableMetadata, s3client);
134+
auto &writer = *sinkDetails.writer;
135+
136+
SPDLOG_INFO("Starting compaction on {:d} files containing {:d} total rows", inputFiles.size(), totalRows);
137+
// Remaining parts initially empty
138+
std::vector<std::unique_ptr<cudf::table>> tables{ readers.size() };
139+
std::size_t rowsInMemory = std::numeric_limits<std::size_t>::max();
140+
std::size_t rowsWritten = 0;
141+
auto const startTime = timestamp();
142+
// Loop doing reads
143+
while (rowsInMemory) {
144+
rowsInMemory = 0;
145+
bool allReadersFinished = true;
146+
// Loop through each reader
147+
for (std::size_t rc = 0; auto &[src, reader, chunkNo, rowCount] : readers) {
148+
auto &oldTable = tables[rc];
149+
// If reader has data and we need some, perform a read
150+
SPDLOG_INFO("Reader {:d}", rc);
151+
if (reader->has_next()) {
152+
allReadersFinished = false;
153+
SPDLOG_INFO(" Reader has rows");
154+
if (!oldTable || oldTable->num_rows() < epsilon) {
155+
if (oldTable) {
156+
SPDLOG_INFO(" We only have {:d} in memory", oldTable->num_rows());
157+
} else {
158+
SPDLOG_INFO(" No previous data in memory");
159+
}
160+
// Read a chunk
161+
SPDLOG_INFO(" Read chunk: {:d}", chunkNo);
162+
auto readTable = reader->read_chunk();
163+
auto const rowsInChunk = readTable.metadata.num_rows_per_source.at(0);
164+
SPDLOG_INFO(" Read chunk of {:d} rows", rowsInChunk);
165+
// Increment chunk number in reader and add to row count
166+
chunkNo++;
167+
rowCount += rowsInChunk;
168+
169+
// Now concat the old part to the new chunk
170+
std::unique_ptr<cudf::table> concat =
171+
(oldTable) ? cudf::concatenate(std::vector{ oldTable->view(), readTable.tbl->view() })
172+
: std::move(readTable.tbl);
173+
oldTable = std::move(concat);
174+
SPDLOG_INFO(" New table has {:d} rows", tables[rc]->num_rows());
175+
}
176+
} else {
177+
SPDLOG_INFO(" Reader {:d} has no more rows", rc);
178+
}
179+
180+
// Update overall count
181+
// If "all Merge" was true last iteration, the pointer may be null
182+
rowsInMemory += (oldTable) ? oldTable->num_rows() : 0;
183+
rc++;
184+
}
185+
186+
SPDLOG_INFO("There are {:d} rows to process", rowsInMemory);
187+
// Merge and write tables
188+
if (rowsInMemory == 0) {
189+
break;
190+
}
191+
192+
// If all readers have run out of data, then we have all remaining data in memory and safely
193+
// merge everything
194+
bool const mergeAll = alwaysMergeAll || allReadersFinished;
195+
std::pair<std::vector<cudf::table_view>, std::vector<cudf::table_view>> tableVectors;
196+
197+
if (mergeAll) {
198+
std::transform(
199+
tables.cbegin(), tables.cend(), std::back_inserter(tableVectors.first), [](auto &&t) noexcept {
200+
return t->view();
201+
});
202+
} else {
203+
// Find the least upper bound in sort column across these tables
204+
auto const leastUpperBound = findLeastUpperBound(tables, 0);
205+
206+
// Now take search "needle" from last row from of table with LUB
207+
auto const lubTable = tables[leastUpperBound]->select({ 0 });
208+
auto const needle = cudf::split(lubTable, { lubTable.num_rows() - 1 })[1];
209+
210+
// Split all tables at the needle
211+
tableVectors = splitAtNeedle(needle, tables);
212+
}
213+
214+
// Merge all the upper parts of the tables
215+
std::size_t rowsToWrite = calcRowsInViews(tableVectors.first);
216+
SPDLOG_INFO("Merging {:d} rows", rowsToWrite);
217+
auto merged = cudf::merge(tableVectors.first, { 0 }, { cudf::order::ASCENDING });
218+
219+
// Perform aggregation on 2nd column, grouping on 1st column
220+
std::vector<std::unique_ptr<cudf::groupby_aggregation>> aggregations;
221+
aggregations.push_back(cudf::make_sum_aggregation<cudf::groupby_aggregation>());
222+
cudf::groupby::aggregation_request aggRequest{ merged->view().column(1), std::move(aggregations) };
223+
cudf::groupby::groupby grouper{ merged->select({ 0 }),
224+
cudf::null_policy::EXCLUDE,
225+
cudf::sorted::YES,
226+
{ cudf::order::ASCENDING },
227+
{ cudf::null_order::BEFORE } };
228+
229+
std::vector<cudf::groupby::aggregation_request> reqs;
230+
reqs.push_back(std::move(aggRequest));
231+
std::pair<std::unique_ptr<cudf::table>, std::vector<cudf::groupby::aggregation_result>> aggResults =
232+
grouper.aggregate(std::move(reqs));
233+
234+
std::vector<std::unique_ptr<cudf::column>> aggCols = aggResults.first->release();
235+
aggCols.push_back(std::move(aggResults.second.front().results.front()));
236+
std::unique_ptr<cudf::table> aggTable = std::make_unique<cudf::table>(std::move(aggCols));
237+
238+
// Duplicate the unmerged parts of the tables, so we can
239+
// opportunistically clear the original tables we no longer need
240+
for (std::size_t idx = 0; auto &&table : tables) {
241+
if (mergeAll) {
242+
table.release();
243+
} else {
244+
table = std::make_unique<cudf::table>(tableVectors.second[idx]);
245+
}
246+
idx++;
247+
}
248+
249+
writer.write(*aggTable);
250+
rowsWritten += rowsToWrite;
251+
252+
auto const elapsedTime = std::chrono::duration_cast<std::chrono::seconds>(timestamp() - startTime);
253+
auto const rowsRead = calcRowsRead(readers);
254+
auto const fracRowsRead = (static_cast<double>(rowsRead) / totalRows);
255+
auto const predictedTime =
256+
std::chrono::duration_cast<std::chrono::seconds>(elapsedTime * (1 / fracRowsRead));
257+
SPDLOG_INFO(
258+
"Read {:d} rows, Wrote {:d} rows, {:.2f}% complete, est. time (total) {:02d}:{:02d} ({:02d}:{:02d})",
259+
rowsRead,
260+
rowsWritten,
261+
fracRowsRead * 100,
262+
elapsedTime.count() / 60,
263+
elapsedTime.count() % 60,
264+
predictedTime.count() / 60,
265+
predictedTime.count() % 60);
266+
}
267+
268+
writer.close();
269+
SPDLOG_INFO("Finished, read/wrote {:d} rows from {:d} readers", rowsWritten, inputFiles.size());
270+
}
271+
gpu_compact::s3::shutdownAWS();
272+
}

0 commit comments

Comments
 (0)