Skip to content

Commit dc8ff32

Browse files
authored
Data collection refactoring (#955)
* Improve resource management * Persist size to memory in AppendOnlyLog * Make DataCollection extend AutoCloseable * Make DataMap extend AutoCloseable * Add DirectHashDataMap * Add builders to all data collections * Make data collections constructors private * Use the builders consistently in calling code * Improve the javadoc and make it more consistent * Clean unused files
1 parent e646718 commit dc8ff32

File tree

77 files changed

+1949
-501
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

77 files changed

+1949
-501
lines changed

baremaps-core/src/main/java/org/apache/baremaps/workflow/WorkflowContext.java

Lines changed: 28 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -73,34 +73,43 @@ public Map<Long, List<Long>> getReferenceMap() throws IOException {
7373

7474
private <T> DataMap<Long, T> getMemoryAlignedDataMap(String name, FixedSizeDataType<T> dataType)
7575
throws IOException {
76-
var coordinateDir = Files.createDirectories(cacheDir.resolve(name));
77-
return new MemoryAlignedDataMap<>(
78-
dataType,
79-
new MemoryMappedDirectory(coordinateDir));
76+
Path coordinateDir = Files.createDirectories(cacheDir.resolve(name));
77+
return MemoryAlignedDataMap.<T>builder()
78+
.dataType(dataType)
79+
.memory(new MemoryMappedDirectory(coordinateDir))
80+
.build();
8081
}
8182

8283
private <T> DataMap<Long, T> getMonotonicDataMap(String name, DataType<T> dataType)
8384
throws IOException {
84-
var mapDir = Files.createDirectories(cacheDir.resolve(name));
85-
var keysDir = Files.createDirectories(mapDir.resolve("keys"));
86-
var valuesDir = Files.createDirectories(mapDir.resolve("values"));
87-
return new MonotonicDataMap<>(
88-
new MemoryAlignedDataList<>(
89-
new PairDataType<>(new LongDataType(), new LongDataType()),
90-
new MemoryMappedDirectory(keysDir)),
91-
new AppendOnlyLog<>(
92-
dataType,
93-
new MemoryMappedDirectory(valuesDir)));
85+
Path mapDir = Files.createDirectories(cacheDir.resolve(name));
86+
Path keysDir = Files.createDirectories(mapDir.resolve("keys"));
87+
Path valuesDir = Files.createDirectories(mapDir.resolve("values"));
88+
MemoryAlignedDataList<PairDataType.Pair<Long, Long>> keys =
89+
MemoryAlignedDataList.<PairDataType.Pair<Long, Long>>builder()
90+
.dataType(new PairDataType<>(new LongDataType(), new LongDataType()))
91+
.memory(new MemoryMappedDirectory(keysDir))
92+
.build();
93+
AppendOnlyLog<T> values = AppendOnlyLog.<T>builder()
94+
.dataType(dataType)
95+
.values(new MemoryMappedDirectory(valuesDir))
96+
.build();
97+
return MonotonicDataMap.<T>builder()
98+
.keys(keys)
99+
.values(values)
100+
.build();
94101
}
95102

96103
private DataMap<Long, Coordinate> getMonotonicPairedDataMap(String name,
97104
DataType<Coordinate> dataType)
98105
throws IOException {
99-
var mapDir = Files.createDirectories(cacheDir.resolve(name));
100-
return new MonotonicPairedDataMap<>(
101-
new MemoryAlignedDataList<>(
102-
new PairDataType<>(new LongDataType(), new LonLatDataType()),
103-
new MemoryMappedDirectory(Files.createDirectories(mapDir))));
106+
Path mapDir = Files.createDirectories(cacheDir.resolve(name));
107+
return MonotonicPairedDataMap.<Coordinate>builder()
108+
.values(MemoryAlignedDataList.<PairDataType.Pair<Long, Coordinate>>builder()
109+
.dataType(new PairDataType<>(new LongDataType(), new LonLatDataType()))
110+
.memory(new MemoryMappedDirectory(Files.createDirectories(mapDir)))
111+
.build())
112+
.build();
104113
}
105114

106115
public void cleanCache() throws IOException {

baremaps-core/src/test/java/org/apache/baremaps/calcite/CalciteTest.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,11 @@ void test() throws SQLException {
8080
new DataColumnFixed("geometry", Cardinality.OPTIONAL, Type.GEOMETRY)));
8181
DataTable cityDataTable = new BaremapsDataTable(
8282
cityRowType,
83-
new IndexedDataList<>(new AppendOnlyLog<>(new RowDataType(cityRowType))));
83+
IndexedDataList.<DataRow>builder()
84+
.values(AppendOnlyLog.<DataRow>builder()
85+
.dataType(new RowDataType(cityRowType))
86+
.build())
87+
.build());
8488
cityDataTable.add(new DataRowImpl(cityDataTable.schema(),
8589
List.of(1, "Paris", geometryFactory.createPoint(new Coordinate(2.3522, 48.8566)))));
8690
cityDataTable.add(new DataRowImpl(cityDataTable.schema(),
@@ -94,7 +98,11 @@ void test() throws SQLException {
9498
new DataColumnFixed("population", Cardinality.OPTIONAL, Type.INTEGER)));
9599
DataTable populationDataTable = new BaremapsDataTable(
96100
populationRowType,
97-
new IndexedDataList<>(new AppendOnlyLog<>(new RowDataType(populationRowType))));
101+
IndexedDataList.<DataRow>builder()
102+
.values(AppendOnlyLog.<DataRow>builder()
103+
.dataType(new RowDataType(populationRowType))
104+
.build())
105+
.build());
98106
populationDataTable
99107
.add(new DataRowImpl(populationDataTable.schema(), List.of(1, 2_161_000)));
100108
populationDataTable

baremaps-core/src/test/java/org/apache/baremaps/tasks/ImportUpdateSampleTest.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,19 @@ void sample() throws Exception {
5555

5656
// Initialize the data maps
5757
Map<Long, Coordinate> coordinateMap = DataConversions.asMap(
58-
new IndexedDataMap<>(new AppendOnlyLog<>(new CoordinateDataType(), new OnHeapMemory())));
58+
IndexedDataMap.<Coordinate>builder()
59+
.values(AppendOnlyLog.<Coordinate>builder()
60+
.dataType(new CoordinateDataType())
61+
.memory(new OnHeapMemory())
62+
.build())
63+
.build());
5964
Map<Long, List<Long>> referenceMap = DataConversions.asMap(
60-
new IndexedDataMap<>(new AppendOnlyLog<>(new LongListDataType(), new OnHeapMemory())));
65+
IndexedDataMap.<List<Long>>builder()
66+
.values(AppendOnlyLog.<List<Long>>builder()
67+
.dataType(new LongListDataType())
68+
.memory(new OnHeapMemory())
69+
.build())
70+
.build());
6171

6272
// Import the sample data
6373
ImportOsmPbf.execute(TestFiles.SAMPLE_OSM_PBF, coordinateMap, referenceMap, headerRepository,

baremaps-data/src/main/java/org/apache/baremaps/data/collection/AppendOnlyLog.java

Lines changed: 105 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,10 @@
3030
import org.apache.baremaps.data.type.DataType;
3131

3232
/**
33-
* A log of records backed by a {@link DataType} and a {@link Memory}. Elements are appended to the
34-
* log and can be accessed by their position in the {@link Memory}. Appending elements to the log is
35-
* thread-safe.
33+
* A log of elements that can only be appended to. Elements are stored in memory and can be accessed
34+
* by their position. Append operations are thread-safe.
3635
*
37-
* @param <E> The type of the data.
36+
* @param <E> The type of elements in the log
3837
*/
3938
public class AppendOnlyLog<E> implements DataCollection<E> {
4039

@@ -47,21 +46,84 @@ public class AppendOnlyLog<E> implements DataCollection<E> {
4746
private final Lock lock = new ReentrantLock();
4847

4948
/**
50-
* Constructs an {@link AppendOnlyLog}.
49+
* Creates a new builder for an AppendOnlyLog.
5150
*
52-
* @param dataType the data type
51+
* @param <E> the type of elements
52+
* @return a new builder
5353
*/
54-
public AppendOnlyLog(DataType<E> dataType) {
55-
this(dataType, new OffHeapMemory());
54+
public static <E> Builder<E> builder() {
55+
return new Builder<>();
5656
}
5757

5858
/**
59-
* Constructs an append only log.
59+
* Builder for AppendOnlyLog.
60+
*
61+
* @param <E> the type of elements
62+
*/
63+
public static class Builder<E> {
64+
private DataType<E> dataType;
65+
private Memory<?> memory;
66+
67+
/**
68+
* Sets the data type for the log.
69+
*
70+
* @param dataType the data type
71+
* @return this builder
72+
*/
73+
public Builder<E> dataType(DataType<?> dataType) {
74+
@SuppressWarnings("unchecked")
75+
DataType<E> castedDataType = (DataType<E>) dataType;
76+
this.dataType = castedDataType;
77+
return this;
78+
}
79+
80+
/**
81+
* Sets the memory for the log.
82+
*
83+
* @param memory the memory
84+
* @return this builder
85+
*/
86+
public Builder<E> memory(Memory<?> memory) {
87+
this.memory = memory;
88+
return this;
89+
}
90+
91+
/**
92+
* Sets the memory for the log values.
93+
*
94+
* @param memory the memory
95+
* @return this builder
96+
*/
97+
public Builder<E> values(Memory<?> memory) {
98+
return memory(memory);
99+
}
100+
101+
/**
102+
* Builds a new AppendOnlyLog.
103+
*
104+
* @return a new AppendOnlyLog
105+
* @throws IllegalStateException if required parameters are missing
106+
*/
107+
public AppendOnlyLog<E> build() {
108+
if (dataType == null) {
109+
throw new IllegalStateException("Data type must be specified");
110+
}
111+
112+
if (memory == null) {
113+
memory = new OffHeapMemory();
114+
}
115+
116+
return new AppendOnlyLog<>(dataType, memory);
117+
}
118+
}
119+
120+
/**
121+
* Constructs an AppendOnlyLog.
60122
*
61123
* @param dataType the data type
62124
* @param memory the memory
63125
*/
64-
public AppendOnlyLog(DataType<E> dataType, Memory<?> memory) {
126+
private AppendOnlyLog(DataType<E> dataType, Memory<?> memory) {
65127
this.dataType = dataType;
66128
this.memory = memory;
67129
this.segmentSize = memory.segmentSize();
@@ -70,10 +132,18 @@ public AppendOnlyLog(DataType<E> dataType, Memory<?> memory) {
70132
}
71133

72134
/**
73-
* Appends the value to the log and returns its position in the memory.
135+
* Persists the current size to memory.
136+
*/
137+
public void persistSize() {
138+
memory.segment(0).putLong(0, size);
139+
}
140+
141+
/**
142+
* Appends an element to the log and returns its position in memory.
74143
*
75-
* @param value the value
76-
* @return the position of the value in the memory.
144+
* @param value the element to add
145+
* @return the position of the element in memory
146+
* @throws DataCollectionException if the element is too large for a segment
77147
*/
78148
public long addPositioned(E value) {
79149
int valueSize = dataType.size(value);
@@ -102,10 +172,10 @@ public long addPositioned(E value) {
102172
}
103173

104174
/**
105-
* Returns a values at the specified position in the memory.
175+
* Returns the element at the specified position in memory.
106176
*
107-
* @param position the position of the value
108-
* @return the value
177+
* @param position the position of the element
178+
* @return the element
109179
*/
110180
public E getPositioned(long position) {
111181
long segmentIndex = position / segmentSize;
@@ -130,32 +200,39 @@ public long size() {
130200
public void clear() {
131201
try {
132202
memory.clear();
203+
this.size = 0;
204+
persistSize();
133205
} catch (IOException e) {
134206
throw new DataCollectionException(e);
135207
}
136208
}
137209

138210
/**
139-
* Returns an iterator over the values of the log, starting at the beginning of the log. The
140-
* iterator allows to get the current position in the memory.
211+
* Returns an iterator over the elements of this log.
141212
*
142-
* @return an iterator over the values of the log
213+
* @return an iterator over the elements
143214
*/
144215
@Override
145216
public AppendOnlyLogIterator iterator() {
146217
return new AppendOnlyLogIterator(size);
147218
}
148219

220+
@Override
221+
public void close() throws Exception {
222+
try {
223+
memory.close();
224+
} catch (IOException e) {
225+
throw new DataCollectionException(e);
226+
}
227+
}
228+
149229
/**
150-
* An iterator over the values of the log that can be used to iterate over the values of the log
151-
* and to get the current position in the memory.
230+
* An iterator over the elements in this log.
152231
*/
153232
public class AppendOnlyLogIterator implements Iterator<E> {
154233

155234
private final long size;
156-
157235
private long index;
158-
159236
private long position;
160237

161238
private AppendOnlyLogIterator(long size) {
@@ -199,9 +276,13 @@ public E next() {
199276
return dataType.read(segment, (int) segmentOffset);
200277
}
201278

279+
/**
280+
* Returns the current position in memory.
281+
*
282+
* @return the current position
283+
*/
202284
public long getPosition() {
203285
return position;
204286
}
205-
206287
}
207288
}

0 commit comments

Comments
 (0)