38
38
#include < utility>
39
39
#include < vector>
40
40
41
- ::size_t calcRowsWritten (auto const &readers) noexcept {
41
+ std ::size_t calcRowsRead (auto const &readers) noexcept {
42
42
return std::accumulate (
43
43
readers.cbegin (), readers.cend (), ::size_t { 0 }, [](auto &&acc, auto const &item) constexpr noexcept {
44
44
return acc + std::get<3 >(item);
45
45
});
46
46
}
47
47
48
+ std::size_t calcRowsInViews (auto const &views) noexcept {
49
+ return std::accumulate (
50
+ views.cbegin (), views.cend (), ::size_t { 0 }, [](auto &&acc, auto const &item) constexpr noexcept {
51
+ return acc + item.num_rows ();
52
+ });
53
+ }
54
+
48
55
[[nodiscard]] std::chrono::time_point<std::chrono::steady_clock> timestamp () noexcept {
49
56
return std::chrono::steady_clock::now ();
50
57
}
@@ -114,82 +121,92 @@ int main(int argc, char **argv) {
114
121
SinkInfoDetails sinkDetails = make_writer (outputFile, tableMetadata, s3client);
115
122
auto &writer = *sinkDetails.writer ;
116
123
117
- SPDLOG_INFO (" Start reading files" );
124
+ SPDLOG_INFO (" Starting compaction on {:d} files containing {:d} total rows " , inputFiles. size (), totalRows );
118
125
// Remaining parts initially empty
119
- std::vector<std::unique_ptr<cudf::table>> remainingParts{ readers.size () };
120
- std::size_t lastTotalRowCount = std::numeric_limits<std::size_t >::max ();
126
+ std::vector<std::unique_ptr<cudf::table>> tables{ readers.size () };
127
+ std::size_t rowsInMemory = std::numeric_limits<std::size_t >::max ();
128
+ std::size_t rowsWritten = 0 ;
121
129
auto const startTime = timestamp ();
122
130
// Loop doing reads
123
- while (lastTotalRowCount ) {
124
- lastTotalRowCount = 0 ;
131
+ while (rowsInMemory ) {
132
+ rowsInMemory = 0 ;
125
133
// Loop through each reader
126
134
for (std::size_t rc = 0 ; auto &[src, reader, chunkNo, rowCount] : readers) {
135
+ auto &oldTable = tables[rc];
127
136
// If reader has data and we need some, perform a read
128
137
SPDLOG_INFO (" Reader {:d}" , rc);
129
138
if (reader->has_next ()) {
130
- SPDLOG_INFO (" Reader has rows" );
131
- if (!remainingParts[rc] || remainingParts[rc]->num_rows () < epsilon) {
132
- SPDLOG_INFO (
133
- " No previous table or we only have {:d} in memory" , remainingParts[rc]->num_rows ());
134
-
139
+ SPDLOG_INFO (" Reader has rows" );
140
+ if (!oldTable || oldTable->num_rows () < epsilon) {
141
+ if (oldTable) {
142
+ SPDLOG_INFO (" We only have {:d} in memory" , oldTable->num_rows ());
143
+ } else {
144
+ SPDLOG_INFO (" No previous data in memory" );
145
+ }
135
146
// Read a chunk
136
147
SPDLOG_INFO (" Read chunk: {:d}" , chunkNo);
137
- auto table = reader->read_chunk ();
138
- auto const rowsInChunk = table .metadata .num_rows_per_source .at (0 );
148
+ auto readTable = reader->read_chunk ();
149
+ auto const rowsInChunk = readTable .metadata .num_rows_per_source .at (0 );
139
150
SPDLOG_INFO (" Read chunk of {:d} rows" , rowsInChunk);
140
151
// Increment chunk number in reader and add to row count
141
152
chunkNo++;
142
153
rowCount += rowsInChunk;
143
154
144
155
// Now concat the old part to the new chunk
145
156
std::unique_ptr<cudf::table> concat =
146
- cudf::concatenate (std::vector{ remainingParts[rc]->view (), table.tbl ->view () });
147
- remainingParts[rc] = std::move (concat);
148
- SPDLOG_INFO (" New table has {:d} rows" , remainingParts[rc]->num_rows ());
157
+ (oldTable) ? cudf::concatenate (std::vector{ oldTable->view (), readTable.tbl ->view () })
158
+ : std::move (readTable.tbl );
159
+ oldTable = std::move (concat);
160
+ SPDLOG_INFO (" New table has {:d} rows" , tables[rc]->num_rows ());
149
161
}
150
162
} else {
151
163
SPDLOG_INFO (" Reader {:d} has no more rows" , rc);
152
164
}
153
165
154
166
// Update overall count
155
- lastTotalRowCount += remainingParts[rc] ->num_rows ();
167
+ rowsInMemory += oldTable ->num_rows ();
156
168
rc++;
157
169
}
158
170
171
+ SPDLOG_INFO (" There are {:d} rows to process" , rowsInMemory);
159
172
// Merge and write tables
160
- if (lastTotalRowCount > 0 ) {
173
+ if (rowsInMemory > 0 ) {
161
174
// Find the least upper bound in sort column across these tables
162
- auto const leastUpperBound = findLeastUpperBound (remainingParts , 0 );
175
+ auto const leastUpperBound = findLeastUpperBound (tables , 0 );
163
176
164
177
// Now take search "needle" from last row from of table with LUB
165
- auto const lubTable = remainingParts [leastUpperBound]->select ({ 0 });
178
+ auto const lubTable = tables [leastUpperBound]->select ({ 0 });
166
179
auto const needle = cudf::split (lubTable, { lubTable.num_rows () - 1 })[1 ];
167
180
168
181
// Split all tables at the needle
169
182
std::pair<std::vector<cudf::table_view>, std::vector<cudf::table_view>> const tableVectors =
170
- splitAtNeedle (needle, remainingParts );
183
+ splitAtNeedle (needle, tables );
171
184
172
185
// Merge all the upper parts of the tables
173
- SPDLOG_INFO (" Merging {:d} rows" , lastTotalRowCount);
186
+ std::size_t rowsToWrite = calcRowsInViews (tableVectors.first );
187
+ SPDLOG_INFO (" Merging {:d} rows" , rowsToWrite);
174
188
auto merged = cudf::merge (tableVectors.first , { 0 }, { cudf::order::ASCENDING });
175
189
176
190
// Duplicate the unmerged parts of the tables, so we can opportunistically clear the original
177
191
// tables we no longer need
178
- for (std::size_t idx = 0 ; auto &&table : remainingParts ) {
192
+ for (std::size_t idx = 0 ; auto &&table : tables ) {
179
193
table = std::make_unique<cudf::table>(tableVectors.second [idx]);
180
194
idx++;
181
195
}
182
196
183
197
writer.write (*merged);
198
+ rowsWritten += rowsToWrite;
184
199
185
200
auto const elapsedTime = std::chrono::duration_cast<std::chrono::seconds>(timestamp () - startTime);
186
- auto const rowsWritten = calcRowsWritten (readers);
187
- auto const fracRowsWritten = (static_cast <double >(rowsWritten ) / totalRows);
201
+ auto const rowsRead = calcRowsRead (readers);
202
+ auto const fracRowsRead = (static_cast <double >(rowsRead ) / totalRows);
188
203
auto const predictedTime =
189
- std::chrono::duration_cast<std::chrono::seconds>(elapsedTime * (1 / fracRowsWritten));
190
- SPDLOG_INFO (" Written {:d} rows, {:.2f}% complete, est. time (total) {:02d}:{:02d} ({:02d}:{:02d})" ,
204
+ std::chrono::duration_cast<std::chrono::seconds>(elapsedTime * (1 / fracRowsRead));
205
+ SPDLOG_INFO (
206
+ " Read {:d} rows, Wrote {:d} rows, {:.2f}% complete, est. time (total) {:02d}:{:02d} ({:02d}:{:02d})" ,
207
+ rowsRead,
191
208
rowsWritten,
192
- fracRowsWritten * 100 ,
209
+ fracRowsRead * 100 ,
193
210
elapsedTime.count () / 60 ,
194
211
elapsedTime.count () % 60 ,
195
212
predictedTime.count () / 60 ,
@@ -199,9 +216,6 @@ int main(int argc, char **argv) {
199
216
200
217
writer.close ();
201
218
202
- // Grab total row count from each reader
203
- auto const rowsWritten = calcRowsWritten (readers);
204
-
205
219
SPDLOG_INFO (" Finished, read/wrote {:d} rows from {:d} readers" , rowsWritten, inputFiles.size ());
206
220
}
207
221
gpu_compact::s3::shutdownAWS ();
0 commit comments