Skip to content

Commit a0562c8

Browse files
chenyangfbsdruzkin
authored andcommitted
Support reset all readers in startStripe()
1 parent e936ad0 commit a0562c8

9 files changed

+87
-15
lines changed

presto-orc/src/main/java/com/facebook/presto/orc/OrcReaderOptions.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ public class OrcReaderOptions
3131
private final boolean appendRowNumber;
3232
// slice reader will throw if the slice size is larger than this value
3333
private final DataSize maxSliceSize;
34+
private final boolean resetAllReaders;
3435

3536
/**
3637
* Read column statistics for flat map columns. Usually there are quite a
@@ -46,7 +47,8 @@ private OrcReaderOptions(
4647
boolean mapNullKeysEnabled,
4748
boolean appendRowNumber,
4849
boolean readMapStatistics,
49-
DataSize maxSliceSize)
50+
DataSize maxSliceSize,
51+
boolean resetAllReaders)
5052
{
5153
this.maxMergeDistance = requireNonNull(maxMergeDistance, "maxMergeDistance is null");
5254
this.maxBlockSize = requireNonNull(maxBlockSize, "maxBlockSize is null");
@@ -56,6 +58,7 @@ private OrcReaderOptions(
5658
this.appendRowNumber = appendRowNumber;
5759
this.readMapStatistics = readMapStatistics;
5860
this.maxSliceSize = maxSliceSize;
61+
this.resetAllReaders = resetAllReaders;
5962
}
6063

6164
public DataSize getMaxMergeDistance()
@@ -98,6 +101,11 @@ public DataSize getMaxSliceSize()
98101
return maxSliceSize;
99102
}
100103

104+
public boolean isResetAllReaders()
105+
{
106+
return resetAllReaders;
107+
}
108+
101109
@Override
102110
public String toString()
103111
{
@@ -110,6 +118,7 @@ public String toString()
110118
.add("appendRowNumber", appendRowNumber)
111119
.add("readMapStatistics", readMapStatistics)
112120
.add("maxSliceSize", maxSliceSize)
121+
.add("resetAllReaders", resetAllReaders)
113122
.toString();
114123
}
115124

@@ -128,6 +137,7 @@ public static final class Builder
128137
private boolean appendRowNumber;
129138
private boolean readMapStatistics;
130139
private DataSize maxSliceSize = DEFAULT_MAX_SLICE_SIZE;
140+
private boolean resetAllReaders;
131141

132142
private Builder() {}
133143

@@ -179,6 +189,12 @@ public Builder withMaxSliceSize(DataSize maxSliceSize)
179189
return this;
180190
}
181191

192+
public Builder withResetAllReaders(boolean resetAllReaders)
193+
{
194+
this.resetAllReaders = resetAllReaders;
195+
return this;
196+
}
197+
182198
public OrcReaderOptions build()
183199
{
184200
return new OrcReaderOptions(
@@ -189,7 +205,8 @@ public OrcReaderOptions build()
189205
mapNullKeysEnabled,
190206
appendRowNumber,
191207
readMapStatistics,
192-
maxSliceSize);
208+
maxSliceSize,
209+
resetAllReaders);
193210
}
194211
}
195212
}

presto-orc/src/main/java/com/facebook/presto/orc/OrcRecordReaderOptions.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ public class OrcRecordReaderOptions
2626
private final boolean mapNullKeysEnabled;
2727
private final boolean appendRowNumber;
2828
private final long maxSliceSize;
29+
private final boolean resetAllReaders;
2930

3031
public OrcRecordReaderOptions(OrcReaderOptions options)
3132
{
@@ -34,7 +35,8 @@ public OrcRecordReaderOptions(OrcReaderOptions options)
3435
options.getMaxBlockSize(),
3536
options.mapNullKeysEnabled(),
3637
options.appendRowNumber(),
37-
options.getMaxSliceSize());
38+
options.getMaxSliceSize(),
39+
options.isResetAllReaders());
3840
}
3941

4042
public OrcRecordReaderOptions(
@@ -43,7 +45,8 @@ public OrcRecordReaderOptions(
4345
DataSize maxBlockSize,
4446
boolean mapNullKeysEnabled,
4547
boolean appendRowNumber,
46-
DataSize maxSliceSize)
48+
DataSize maxSliceSize,
49+
boolean resetAllReaders)
4750
{
4851
this.maxMergeDistance = requireNonNull(maxMergeDistance, "maxMergeDistance is null");
4952
this.maxBlockSize = requireNonNull(maxBlockSize, "maxBlockSize is null");
@@ -53,6 +56,7 @@ public OrcRecordReaderOptions(
5356
checkArgument(maxSliceSize.toBytes() < Integer.MAX_VALUE, "maxSliceSize cannot be larger than Integer.MAX_VALUE");
5457
checkArgument(maxSliceSize.toBytes() > 0, "maxSliceSize must be positive");
5558
this.maxSliceSize = maxSliceSize.toBytes();
59+
this.resetAllReaders = resetAllReaders;
5660
}
5761

5862
public DataSize getMaxMergeDistance()
@@ -84,4 +88,9 @@ public long getMaxSliceSize()
8488
{
8589
return maxSliceSize;
8690
}
91+
92+
public boolean isResetAllReaders()
93+
{
94+
return resetAllReaders;
95+
}
8796
}

presto-orc/src/main/java/com/facebook/presto/orc/reader/BatchStreamReaders.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public static BatchStreamReader createStreamReader(Type type, StreamDescriptor s
4040
case INT:
4141
case LONG:
4242
case DATE:
43-
return new LongBatchStreamReader(type, streamDescriptor, systemMemoryContext);
43+
return new LongBatchStreamReader(type, streamDescriptor, systemMemoryContext, options.isResetAllReaders());
4444
case FLOAT:
4545
return new FloatBatchStreamReader(type, streamDescriptor);
4646
case DOUBLE:
@@ -49,7 +49,7 @@ public static BatchStreamReader createStreamReader(Type type, StreamDescriptor s
4949
case STRING:
5050
case VARCHAR:
5151
case CHAR:
52-
return new SliceBatchStreamReader(type, streamDescriptor, systemMemoryContext, options.getMaxSliceSize());
52+
return new SliceBatchStreamReader(type, streamDescriptor, systemMemoryContext, options.getMaxSliceSize(), options.isResetAllReaders());
5353
case TIMESTAMP:
5454
case TIMESTAMP_MICROSECONDS:
5555
boolean enableMicroPrecision = type == TIMESTAMP_MICROSECONDS;

presto-orc/src/main/java/com/facebook/presto/orc/reader/LongBatchStreamReader.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,13 +43,15 @@ public class LongBatchStreamReader
4343
private final LongDirectBatchStreamReader directReader;
4444
private final LongDictionaryBatchStreamReader dictionaryReader;
4545
private BatchStreamReader currentReader;
46+
private final boolean resetAllReaders;
4647

47-
public LongBatchStreamReader(Type type, StreamDescriptor streamDescriptor, OrcAggregatedMemoryContext systemMemoryContext)
48+
public LongBatchStreamReader(Type type, StreamDescriptor streamDescriptor, OrcAggregatedMemoryContext systemMemoryContext, boolean resetAllReaders)
4849
throws OrcCorruptionException
4950
{
5051
this.streamDescriptor = requireNonNull(streamDescriptor, "stream is null");
5152
directReader = new LongDirectBatchStreamReader(type, streamDescriptor, systemMemoryContext.newOrcLocalMemoryContext(LongBatchStreamReader.class.getSimpleName()));
5253
dictionaryReader = new LongDictionaryBatchStreamReader(type, streamDescriptor, systemMemoryContext.newOrcLocalMemoryContext(LongBatchStreamReader.class.getSimpleName()));
54+
this.resetAllReaders = resetAllReaders;
5355
}
5456

5557
@Override
@@ -74,9 +76,17 @@ public void startStripe(Stripe stripe)
7476
.getColumnEncodingKind();
7577
if (kind == DIRECT || kind == DIRECT_V2 || kind == DWRF_DIRECT) {
7678
currentReader = directReader;
79+
if (dictionaryReader != null && resetAllReaders) {
80+
dictionaryReader.startStripe(stripe);
81+
System.setProperty("RESET_LONG_BATCH_READER", "RESET_LONG_BATCH_READER");
82+
}
7783
}
7884
else if (kind == DICTIONARY) {
7985
currentReader = dictionaryReader;
86+
if (directReader != null && resetAllReaders) {
87+
directReader.startStripe(stripe);
88+
System.setProperty("RESET_LONG_BATCH_READER", "RESET_LONG_BATCH_READER");
89+
}
8090
}
8191
else {
8292
throw new IllegalArgumentException("Unsupported encoding " + kind);

presto-orc/src/main/java/com/facebook/presto/orc/reader/LongSelectiveStreamReader.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,10 @@ public LongSelectiveStreamReader(
5252
Optional<Type> outputType,
5353
OrcAggregatedMemoryContext systemMemoryContext,
5454
boolean isLowMemory,
55-
long maxSliceSize)
55+
long maxSliceSize,
56+
boolean resetAllReaders)
5657
{
57-
this.context = new SelectiveReaderContext(streamDescriptor, outputType, filter, systemMemoryContext, isLowMemory, maxSliceSize);
58+
this.context = new SelectiveReaderContext(streamDescriptor, outputType, filter, systemMemoryContext, isLowMemory, maxSliceSize, resetAllReaders);
5859
}
5960

6061
@Override
@@ -73,12 +74,20 @@ public void startStripe(Stripe stripe)
7374
directReader = new LongDirectSelectiveStreamReader(context);
7475
}
7576
currentReader = directReader;
77+
if (dictionaryReader != null && context.isResetAllReaders()) {
78+
dictionaryReader = null;
79+
System.setProperty("RESET_LONG_READER", "RESET_LONG_READER");
80+
}
7681
break;
7782
case DICTIONARY:
7883
if (dictionaryReader == null) {
7984
dictionaryReader = new LongDictionarySelectiveStreamReader(context);
8085
}
8186
currentReader = dictionaryReader;
87+
if (directReader != null && context.isResetAllReaders()) {
88+
directReader = null;
89+
System.setProperty("RESET_LONG_READER", "RESET_LONG_READER");
90+
}
8291
break;
8392
default:
8493
throw new IllegalArgumentException("Unsupported encoding " + kind);

presto-orc/src/main/java/com/facebook/presto/orc/reader/SelectiveReaderContext.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,14 +41,16 @@ public class SelectiveReaderContext
4141
private final OrcAggregatedMemoryContext systemMemoryContext;
4242
private final boolean isLowMemory;
4343
private final long maxSliceSize;
44+
private final boolean resetAllReaders;
4445

4546
public SelectiveReaderContext(
4647
StreamDescriptor streamDescriptor,
4748
Optional<Type> outputType,
4849
Optional<TupleDomainFilter> filter,
4950
OrcAggregatedMemoryContext systemMemoryContext,
5051
boolean isLowMemory,
51-
long maxSliceSize)
52+
long maxSliceSize,
53+
boolean resetAllReaders)
5254
{
5355
this.filter = requireNonNull(filter, "filter is null").orElse(null);
5456
this.streamDescriptor = requireNonNull(streamDescriptor, "streamDescriptor is null");
@@ -62,6 +64,7 @@ public SelectiveReaderContext(
6264
checkArgument(maxSliceSize < Integer.MAX_VALUE, "maxSliceSize cannot be larger than Integer.MAX_VALUE");
6365
checkArgument(maxSliceSize > 0, "maxSliceSize must be positive");
6466
this.maxSliceSize = maxSliceSize;
67+
this.resetAllReaders = resetAllReaders;
6568
}
6669

6770
public StreamDescriptor getStreamDescriptor()
@@ -116,4 +119,9 @@ public long getMaxSliceSize()
116119
{
117120
return maxSliceSize;
118121
}
122+
123+
public boolean isResetAllReaders()
124+
{
125+
return resetAllReaders;
126+
}
119127
}

presto-orc/src/main/java/com/facebook/presto/orc/reader/SelectiveStreamReaders.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ public static SelectiveStreamReader createStreamReader(
8383
case DATE: {
8484
checkArgument(requiredSubfields.isEmpty(), "Primitive type stream reader doesn't support subfields");
8585
verifyStreamType(streamDescriptor, outputType, t -> t instanceof BigintType || t instanceof IntegerType || t instanceof SmallintType || t instanceof DateType);
86-
return new LongSelectiveStreamReader(streamDescriptor, getOptionalOnlyFilter(type, filters), outputType, systemMemoryContext, isLowMemory, options.getMaxSliceSize());
86+
return new LongSelectiveStreamReader(streamDescriptor, getOptionalOnlyFilter(type, filters), outputType, systemMemoryContext, isLowMemory, options.getMaxSliceSize(), options.isResetAllReaders());
8787
}
8888
case FLOAT: {
8989
checkArgument(requiredSubfields.isEmpty(), "Float type stream reader doesn't support subfields");
@@ -100,7 +100,7 @@ public static SelectiveStreamReader createStreamReader(
100100
case CHAR:
101101
checkArgument(requiredSubfields.isEmpty(), "Primitive stream reader doesn't support subfields");
102102
verifyStreamType(streamDescriptor, outputType, t -> t instanceof VarcharType || t instanceof CharType || t instanceof VarbinaryType);
103-
return new SliceSelectiveStreamReader(streamDescriptor, getOptionalOnlyFilter(type, filters), outputType, systemMemoryContext, isLowMemory, options.getMaxSliceSize());
103+
return new SliceSelectiveStreamReader(streamDescriptor, getOptionalOnlyFilter(type, filters), outputType, systemMemoryContext, isLowMemory, options.getMaxSliceSize(), options.isResetAllReaders());
104104
case TIMESTAMP:
105105
case TIMESTAMP_MICROSECONDS: {
106106
boolean enableMicroPrecision = outputType.isPresent() && outputType.get() == TIMESTAMP_MICROSECONDS;

presto-orc/src/main/java/com/facebook/presto/orc/reader/SliceBatchStreamReader.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,15 +54,17 @@ public class SliceBatchStreamReader
5454
private final SliceDirectBatchStreamReader directReader;
5555
private final SliceDictionaryBatchStreamReader dictionaryReader;
5656
private BatchStreamReader currentReader;
57+
private final boolean resetAllReaders;
5758

58-
public SliceBatchStreamReader(Type type, StreamDescriptor streamDescriptor, OrcAggregatedMemoryContext systemMemoryContext, long maxSliceSize)
59+
public SliceBatchStreamReader(Type type, StreamDescriptor streamDescriptor, OrcAggregatedMemoryContext systemMemoryContext, long maxSliceSize, boolean resetAllReaders)
5960
throws OrcCorruptionException
6061
{
6162
requireNonNull(type, "type is null");
6263
verifyStreamType(streamDescriptor, type, t -> t instanceof VarcharType || t instanceof CharType || t instanceof VarbinaryType);
6364
this.streamDescriptor = requireNonNull(streamDescriptor, "stream is null");
6465
this.directReader = new SliceDirectBatchStreamReader(streamDescriptor, getMaxCodePointCount(type), isCharType(type), maxSliceSize);
6566
this.dictionaryReader = new SliceDictionaryBatchStreamReader(streamDescriptor, getMaxCodePointCount(type), isCharType(type), systemMemoryContext.newOrcLocalMemoryContext(SliceBatchStreamReader.class.getSimpleName()));
67+
this.resetAllReaders = resetAllReaders;
6668
}
6769

6870
@Override
@@ -87,9 +89,17 @@ public void startStripe(Stripe stripe)
8789
.getColumnEncodingKind();
8890
if (columnEncodingKind == DIRECT || columnEncodingKind == DIRECT_V2 || columnEncodingKind == DWRF_DIRECT) {
8991
currentReader = directReader;
92+
if (dictionaryReader != null && resetAllReaders) {
93+
dictionaryReader.startStripe(stripe);
94+
System.setProperty("RESET_SLICE_BATCH_READER", "RESET_SLICE_BATCH_READER");
95+
}
9096
}
9197
else if (columnEncodingKind == DICTIONARY || columnEncodingKind == DICTIONARY_V2) {
9298
currentReader = dictionaryReader;
99+
if (directReader != null && resetAllReaders) {
100+
directReader.startStripe(stripe);
101+
System.setProperty("RESET_SLICE_BATCH_READER", "RESET_SLICE_BATCH_READER");
102+
}
93103
}
94104
else {
95105
throw new IllegalArgumentException("Unsupported encoding " + columnEncodingKind);

presto-orc/src/main/java/com/facebook/presto/orc/reader/SliceSelectiveStreamReader.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,10 @@ public SliceSelectiveStreamReader(
5656
Optional<Type> outputType,
5757
OrcAggregatedMemoryContext systemMemoryContext,
5858
boolean isLowMemory,
59-
long maxSliceSize)
59+
long maxSliceSize,
60+
boolean resetAllReaders)
6061
{
61-
this.context = new SelectiveReaderContext(streamDescriptor, outputType, filter, systemMemoryContext, isLowMemory, maxSliceSize);
62+
this.context = new SelectiveReaderContext(streamDescriptor, outputType, filter, systemMemoryContext, isLowMemory, maxSliceSize, resetAllReaders);
6263
}
6364

6465
public static int computeTruncatedLength(Slice slice, int offset, int length, int maxCodePointCount, boolean isCharType)
@@ -88,13 +89,21 @@ public void startStripe(Stripe stripe)
8889
directReader = new SliceDirectSelectiveStreamReader(context);
8990
}
9091
currentReader = directReader;
92+
if (dictionaryReader != null && context.isResetAllReaders()) {
93+
dictionaryReader = null;
94+
System.setProperty("RESET_SLICE_READER", "RESET_SLICE_READER");
95+
}
9196
break;
9297
case DICTIONARY:
9398
case DICTIONARY_V2:
9499
if (dictionaryReader == null) {
95100
dictionaryReader = new SliceDictionarySelectiveReader(context);
96101
}
97102
currentReader = dictionaryReader;
103+
if (directReader != null && context.isResetAllReaders()) {
104+
directReader = null;
105+
System.setProperty("RESET_SLICE_READER", "RESET_SLICE_READER");
106+
}
98107
break;
99108
default:
100109
throw new IllegalArgumentException("Unsupported encoding " + kind);

0 commit comments

Comments
 (0)