Skip to content

Commit e7dd716

Browse files
committed
Region fixes
1 parent 0e6d2fd commit e7dd716

File tree

3 files changed

+18
-11
lines changed

3 files changed

+18
-11
lines changed

java/compaction/compaction-datafusion/src/main/java/sleeper/compaction/datafusion/DataFusionCompactionRunner.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ private static FFICommonConfig createCompactionParams(CompactionJob job, TablePr
126126
if (DataEngine.AGGREGATION_ITERATOR_NAME.equals(job.getIteratorClassName())) {
127127
params.iterator_config.set(job.getIteratorConfig());
128128
}
129-
FFISleeperRegion partitionRegion = new FFISleeperRegion(runtime, region);
129+
FFISleeperRegion partitionRegion = new FFISleeperRegion(runtime, schema, region);
130130
params.setRegion(partitionRegion);
131131
params.validate();
132132
return params;

java/foreign-bridge/src/main/java/sleeper/foreign/FFISleeperRegion.java

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,10 @@
2020

2121
import sleeper.core.range.Range;
2222
import sleeper.core.range.Region;
23+
import sleeper.core.schema.Schema;
2324
import sleeper.foreign.bridge.FFIArray;
2425

26+
import java.util.List;
2527
import java.util.Objects;
2628

2729
/**
@@ -46,50 +48,55 @@ public class FFISleeperRegion extends Struct {
4648
public final FFIArray<java.lang.Boolean> maxs_inclusive = new FFIArray<>(this);
4749

4850
public FFISleeperRegion(jnr.ffi.Runtime runtime) {
49-
this(runtime, null);
51+
this(runtime, null, null);
5052
}
5153

5254
/**
5355
* Creates and validates Sleeper region into an FFI compatible form.
5456
*
5557
* @param runtime FFI runtime
58+
* @param schema the table schema
5659
* @param region the Sleeper partition region
5760
* @throws IllegalStateException when the region data is invalid
5861
*/
59-
public FFISleeperRegion(jnr.ffi.Runtime runtime, Region region) {
62+
public FFISleeperRegion(jnr.ffi.Runtime runtime, Schema schema, Region region) {
6063
super(runtime);
6164
if (region != null) {
62-
populateRegion(region);
65+
populateRegion(schema, region);
6366
}
6467
}
6568

6669
/**
6770
* Set the region data in this FFI object.
6871
*
72+
* @param schema the table schema
6973
* @param region region data to copy into FFI object
7074
* @throws NullPointerException if {@code region} is {@code null}
7175
*/
7276
@SuppressWarnings(value = "checkstyle:avoidNestedBlocks")
73-
public void populateRegion(Region region) {
77+
public void populateRegion(Schema schema, Region region) {
78+
Objects.requireNonNull(schema, "schema");
7479
Objects.requireNonNull(region, "region");
80+
List<java.lang.String> rowKeysOrdered = schema.getRowKeyFieldNames();
81+
List<Range> orderedRanges = rowKeysOrdered.stream().map(rowKeyName -> region.getRange(rowKeyName)).toList();
7582
// Extra braces: Make sure wrong array isn't populated to wrong pointers
7683
{
7784
// This array can't contain nulls
78-
Object[] regionMins = region.getRanges().stream().map(Range::getMin).toArray();
85+
Object[] regionMins = orderedRanges.stream().map(Range::getMin).toArray();
7986
this.mins.populate(regionMins, false);
8087
}
8188
{
82-
java.lang.Boolean[] regionMinInclusives = region.getRanges().stream().map(Range::isMinInclusive)
89+
java.lang.Boolean[] regionMinInclusives = orderedRanges.stream().map(Range::isMinInclusive)
8390
.toArray(java.lang.Boolean[]::new);
8491
this.mins_inclusive.populate(regionMinInclusives, false);
8592
}
8693
{
8794
// This array can contain nulls
88-
Object[] regionMaxs = region.getRanges().stream().map(Range::getMax).toArray();
95+
Object[] regionMaxs = orderedRanges.stream().map(Range::getMax).toArray();
8996
this.maxs.populate(regionMaxs, true);
9097
}
9198
{
92-
java.lang.Boolean[] regionMaxInclusives = region.getRanges().stream().map(Range::isMaxInclusive)
99+
java.lang.Boolean[] regionMaxInclusives = orderedRanges.stream().map(Range::isMaxInclusive)
93100
.toArray(java.lang.Boolean[]::new);
94101
this.maxs_inclusive.populate(regionMaxInclusives, false);
95102
}

java/query/query-datafusion/src/main/java/sleeper/query/datafusion/DataFusionLeafPartitionRowRetriever.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ private static FFILeafPartitionQueryConfig createFFIQueryData(LeafPartitionQuery
148148
if (DataEngine.AGGREGATION_ITERATOR_NAME.equals(query.getQueryTimeIteratorClassName())) {
149149
common.iterator_config.set(query.getQueryTimeIteratorConfig());
150150
}
151-
FFISleeperRegion partitionRegion = new FFISleeperRegion(runtime, query.getPartitionRegion());
151+
FFISleeperRegion partitionRegion = new FFISleeperRegion(runtime, dataReadSchema, query.getPartitionRegion());
152152
common.setRegion(partitionRegion);
153153
common.validate();
154154

@@ -161,7 +161,7 @@ private static FFILeafPartitionQueryConfig createFFIQueryData(LeafPartitionQuery
161161
queryConfig.requested_value_fields_set.set(false);
162162
}
163163

164-
FFISleeperRegion[] ffiRegions = query.getRegions().stream().map(region -> new FFISleeperRegion(runtime, region)).toArray(FFISleeperRegion[]::new);
164+
FFISleeperRegion[] ffiRegions = query.getRegions().stream().map(region -> new FFISleeperRegion(runtime, dataReadSchema, region)).toArray(FFISleeperRegion[]::new);
165165
queryConfig.setQueryRegions(ffiRegions);
166166
queryConfig.write_quantile_sketch.set(false);
167167
queryConfig.explain_plans.set(true);

0 commit comments

Comments
 (0)