Skip to content

Commit 2108997

Browse files
committed
Fix compile issues
1 parent d0ee726 commit 2108997

File tree

5 files changed

+15
-57
lines changed

5 files changed

+15
-57
lines changed

java/compaction/compaction-core/src/test/java/sleeper/compaction/core/task/CompactionTaskTestBase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ private void runTask(
154154
CompactionJobCommitterOrSendToLambda committer = new CompactionJobCommitterOrSendToLambda(
155155
tablePropertiesProvider(), stateStoreProvider(),
156156
jobTracker, stateStoreCommitQueue::add, batcherCommitQueue::add, timeSupplier);
157-
CompactionRunnerFactory selector = (job, properties) -> compactor;
157+
CompactionRunnerFactory selector = (job, instanceProperties, properties) -> compactor;
158158
new CompactionTask(instanceProperties, tablePropertiesProvider(), PropertiesReloader.neverReload(),
159159
stateStoreProvider(), messageReceiver, fileAssignmentCheck,
160160
committer, jobTracker, taskTracker, selector, taskId, jobRunIdSupplier, timeSupplier, sleeps::add)

java/compaction/compaction-gpu/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
<parent>
2121
<artifactId>compaction</artifactId>
2222
<groupId>sleeper</groupId>
23-
<version>0.28.0-SNAPSHOT</version>
23+
<version>0.29.0-SNAPSHOT</version>
2424
</parent>
2525
<properties>
2626
<protobuf.version>3.25.1</protobuf.version>

java/compaction/compaction-gpu/src/main/java/sleeper/compaction/gpu/GPUCompactionRunner.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,13 @@
3434
import sleeper.core.properties.table.TableProperties;
3535
import sleeper.core.range.Range;
3636
import sleeper.core.range.Region;
37-
import sleeper.core.record.process.RecordsProcessed;
3837
import sleeper.core.schema.Schema;
3938
import sleeper.core.schema.type.ByteArrayType;
4039
import sleeper.core.schema.type.IntType;
4140
import sleeper.core.schema.type.LongType;
4241
import sleeper.core.schema.type.PrimitiveType;
4342
import sleeper.core.schema.type.StringType;
43+
import sleeper.core.tracker.job.run.RecordsProcessed;
4444

4545
import java.io.IOException;
4646
import java.nio.ByteBuffer;

rust/compaction/src/datafusion.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,9 +82,14 @@ pub async fn compact(
8282
})
8383
.inspect_err(|e| warn!("Error getting total input size {e}"));
8484
let multipart_size = std::cmp::max(
85+
crate::store::MULTIPART_BUF_SIZE,
86+
input_size.unwrap_or_default() / 5000,
87+
);
88+
store_factory
8589
.get_object_store(output_path)
8690
.map_err(|e| DataFusionError::External(e.into()))?
8791
.set_multipart_size_hint(multipart_size);
92+
info!(
8893
"Setting multipart size hint to {} bytes.",
8994
multipart_size.to_formatted_string(&Locale::en)
9095
);

rust/compaction/src/datafusion/udf.rs

Lines changed: 7 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,13 @@
1919
* See the License for the specific language governing permissions and
2020
* limitations under the License.
2121
*/
22+
use std::{
23+
any::Any,
24+
fmt::Debug,
25+
iter::zip,
26+
sync::{Mutex, MutexGuard},
27+
};
28+
2229
use arrow::{
2330
array::AsArray,
2431
datatypes::{
@@ -31,16 +38,6 @@ use datafusion::{
3138
prelude::Expr,
3239
scalar::ScalarValue,
3340
};
34-
use log::info;
35-
use num_format::{Locale, ToFormattedString};
36-
37-
use std::{
38-
any::Any,
39-
fmt::Debug,
40-
iter::zip,
41-
sync::{Mutex, MutexGuard},
42-
time::{Duration, Instant},
43-
};
4441

4542
use super::sketch::{update_sketch, DataSketchVariant, K};
4643

@@ -50,12 +47,10 @@ use super::sketch::{update_sketch, DataSketchVariant, K};
5047
/// so we can inject them into a sketch for later retrieval. The query should look something like:
5148
/// `SELECT sketch(row_key_col1, row_key_col2, ...), row_key_col2, value_col1, value_col2, ... FROM blah...`
5249
/// so the sketch function can see each row key column, but only returns the first.
53-
5450
pub(crate) struct SketchUDF {
5551
signature: Signature,
5652
invoke_count: Mutex<usize>,
5753
sketch: Mutex<Vec<DataSketchVariant>>,
58-
bench_stats: Mutex<BenchStats>,
5954
}
6055

6156
impl Debug for SketchUDF {
@@ -69,21 +64,13 @@ impl Debug for SketchUDF {
6964
}
7065

7166
impl SketchUDF {
72-
const BENCH_DURATION: Duration = Duration::from_secs(10);
7367
/// Create a new sketch function based on the schema of the row key fields.
7468
///
7569
pub fn new(schema: &DFSchema, row_keys: &[String]) -> Self {
7670
Self {
7771
signature: Signature::exact(get_row_key_types(schema, row_keys), Volatility::Immutable),
7872
invoke_count: Mutex::default(),
7973
sketch: Mutex::new(make_sketches_for_schema(schema, row_keys)),
80-
bench_stats: Mutex::new(BenchStats {
81-
start_time: Instant::now(),
82-
last_instant_measure: Instant::now(),
83-
last_instant_row_speed: 0,
84-
rows_since_instant: 0,
85-
rows_since_start: 0,
86-
}),
8774
}
8875
}
8976

@@ -94,35 +81,6 @@ impl SketchUDF {
9481
pub fn get_invoke_count(&self) -> usize {
9582
*self.invoke_count.lock().unwrap()
9683
}
97-
98-
fn bench_report(&self, rows: usize) {
99-
let mut stats_lock = self.bench_stats.lock().unwrap();
100-
stats_lock.rows_since_start += rows;
101-
stats_lock.rows_since_instant += rows;
102-
let now = Instant::now();
103-
104-
if now.duration_since(stats_lock.last_instant_measure) > Self::BENCH_DURATION {
105-
stats_lock.last_instant_row_speed = stats_lock.rows_since_instant as u64
106-
/ std::cmp::max(
107-
now.duration_since(stats_lock.last_instant_measure)
108-
.as_secs(),
109-
1,
110-
);
111-
stats_lock.last_instant_measure = now;
112-
stats_lock.rows_since_instant = 0;
113-
let rows_speed_from_start = stats_lock.rows_since_start as u64
114-
/ std::cmp::max(now.duration_since(stats_lock.start_time).as_secs(), 1);
115-
116-
info!(
117-
"Bench speeds: {} rows/sec. ({} second rolling avg.) {} rows/sec. lifetime avg.",
118-
stats_lock
119-
.last_instant_row_speed
120-
.to_formatted_string(&Locale::en),
121-
Self::BENCH_DURATION.as_secs(),
122-
rows_speed_from_start.to_formatted_string(&Locale::en)
123-
);
124-
}
125-
}
12684
}
12785

12886
/// Create a [`Vec`] of data types for this schema from the row keys.
@@ -191,7 +149,6 @@ impl ScalarUDFImpl for SketchUDF {
191149
for (sketch, col) in zip(sk_lock.iter_mut(), &args.args) {
192150
match col {
193151
ColumnarValue::Array(array) => {
194-
self.bench_report(array.len());
195152
// dynamic dispatch. Match the datatype to the type of sketch to update.
196153
match array.data_type() {
197154
DataType::Int32 => update_sketch(sketch, &array.as_primitive::<Int32Type>()),
@@ -229,23 +186,19 @@ impl ScalarUDFImpl for SketchUDF {
229186
| ScalarValue::LargeUtf8(Some(value))
230187
| ScalarValue::Utf8View(Some(value)),
231188
) => {
232-
self.bench_report(1);
233189
sketch.update(value);
234190
}
235191
ColumnarValue::Scalar(
236192
ScalarValue::Binary(Some(value))
237193
| ScalarValue::LargeBinary(Some(value))
238194
| ScalarValue::BinaryView(Some(value)),
239195
) => {
240-
self.bench_report(1);
241196
sketch.update(value);
242197
}
243198
ColumnarValue::Scalar(ScalarValue::Int32(Some(value))) => {
244-
self.bench_report(1);
245199
sketch.update(value);
246200
}
247201
ColumnarValue::Scalar(ScalarValue::Int64(Some(value))) => {
248-
self.bench_report(1);
249202
sketch.update(value);
250203
}
251204
x @ ColumnarValue::Scalar(_) => {

0 commit comments

Comments
 (0)