Skip to content

Commit 6e3ddc9

Browse files
committed
Start adding simple rate monitor to DF
1 parent 730bfb1 commit 6e3ddc9

File tree

2 files changed

+16
-2
lines changed

2 files changed

+16
-2
lines changed

cpp/include/io/s3_sink.hpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
namespace gpu_compact::s3
1717
{
1818

19-
inline constexpr std::size_t DEFAULT_UPLOAD_SIZE = 50 * 1'048'576;
19+
inline constexpr std::size_t DEFAULT_UPLOAD_SIZE = 200 * 1'048'576;
2020

2121
struct S3Sink final : public cudf::io::data_sink
2222
{
@@ -63,4 +63,4 @@ struct S3Sink final : public cudf::io::data_sink
6363
std::size_t bytes_written() noexcept override;
6464
};
6565

66-
}// namespace gpu_compact::cudf_compact::s3
66+
}// namespace gpu_compact::s3

rust/compaction/src/aws_s3.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,8 @@ pub trait ExtendedObjectStore: ObjectStore {
9090
/// Get the number of bytes read in requests.
9191
fn get_bytes_read(&self) -> Option<usize>;
9292

93+
fn set_total_predicted(&mut self, prediction: Option<usize>);
94+
9395
/// Trait upcasting.
9496
fn as_object_store(self: Arc<Self>) -> Arc<dyn ObjectStore>;
9597
}
@@ -196,6 +198,7 @@ pub async fn default_s3_config() -> color_eyre::Result<AmazonS3Builder> {
196198
pub struct LoggingObjectStore {
197199
store: Arc<dyn ObjectStore>,
198200
get_count: Mutex<usize>,
201+
total_predicted: Option<usize>,
199202
get_bytes_read: Mutex<usize>,
200203
capacity: Mutex<usize>,
201204
}
@@ -205,6 +208,7 @@ impl LoggingObjectStore {
205208
Self {
206209
store: inner,
207210
get_count: Mutex::new(0),
211+
total_predicted: None,
208212
get_bytes_read: Mutex::new(0),
209213
capacity: Mutex::new(MULTIPART_BUF_SIZE),
210214
}
@@ -277,6 +281,10 @@ impl ObjectStore for LoggingObjectStore {
277281
*self.get_bytes_read.lock().unwrap() += range.len();
278282
}
279283
*self.get_count.lock().unwrap() += 1;
284+
if let Some(total) = self.total_predicted {
285+
let percentComplete: <usize as Div<f64>>::Output =
286+
*self.get_count.lock().unwrap() / (total as f64);
287+
}
280288
self.store.get_opts(location, options)
281289
}
282290

@@ -432,6 +440,12 @@ impl ExtendedObjectStore for LoggingObjectStore {
432440
fn as_object_store(self: Arc<Self>) -> Arc<dyn ObjectStore> {
433441
self
434442
}
443+
444+
fn set_total_predicted(&mut self, prediction: Option<usize>) {
445+
if let Some(value) = prediction {
446+
self.total_predicted = Some(value);
447+
}
448+
}
435449
}
436450

437451
#[derive(Debug)]

0 commit comments

Comments
 (0)