Skip to content

Commit f2fb634

Browse files
authored
Implement a geoparquet writer (#899)
1 parent 39a8abf commit f2fb634

File tree

10 files changed

+429
-49
lines changed

10 files changed

+429
-49
lines changed

baremaps-benchmarking/src/main/java/org/apache/baremaps/benchmarking/geoparquet/OvertureMapsBenchmark.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,14 +83,16 @@ public void setup() throws IOException {
8383
@SuppressWarnings({"squid:S1481", "squid:S2201"})
8484
@Benchmark
8585
public void read() {
86-
GeoParquetReader reader = new GeoParquetReader(directory.toUri());
86+
var path = new org.apache.hadoop.fs.Path(directory.toUri());
87+
GeoParquetReader reader = new GeoParquetReader(path);
8788
reader.read().count();
8889
}
8990

9091
@SuppressWarnings({"squid:S1481", "squid:S2201"})
9192
@Benchmark
9293
public void readParallel() {
93-
GeoParquetReader reader = new GeoParquetReader(directory.toUri());
94+
var path = new org.apache.hadoop.fs.Path(directory.toUri());
95+
GeoParquetReader reader = new GeoParquetReader(path);
9496
reader.readParallel().count();
9597
}
9698
}

baremaps-benchmarking/src/main/java/org/apache/baremaps/benchmarking/geoparquet/SmallFileBenchmark.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,16 +61,17 @@ public void setup() throws IOException {
6161
@SuppressWarnings({"squid:S1481", "squid:S2201"})
6262
@Benchmark
6363
public void read() {
64-
GeoParquetReader reader =
65-
new GeoParquetReader(Path.of("baremaps-benchmarking/data/small/*.parquet").toUri());
64+
var path = new org.apache.hadoop.fs.Path("baremaps-benchmarking/data/small/*.parquet");
65+
GeoParquetReader reader = new GeoParquetReader(path);
6666
reader.read().count();
6767
}
6868

6969
@SuppressWarnings({"squid:S1481", "squid:S2201"})
7070
@Benchmark
7171
public void readParallel() {
72+
var path = new org.apache.hadoop.fs.Path("baremaps-benchmarking/data/small/*.parquet");
7273
GeoParquetReader reader =
73-
new GeoParquetReader(Path.of("baremaps-benchmarking/data/small/*.parquet").toUri());
74+
new GeoParquetReader(path);
7475
reader.readParallel().count();
7576
}
7677
}

baremaps-core/src/main/java/org/apache/baremaps/storage/geoparquet/GeoParquetDataTable.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.baremaps.data.storage.*;
2525
import org.apache.baremaps.geoparquet.GeoParquetException;
2626
import org.apache.baremaps.geoparquet.GeoParquetReader;
27+
import org.apache.hadoop.fs.Path;
2728

2829
public class GeoParquetDataTable implements DataTable {
2930

@@ -35,7 +36,7 @@ public class GeoParquetDataTable implements DataTable {
3536

3637
public GeoParquetDataTable(URI path) {
3738
this.path = path;
38-
this.reader = new GeoParquetReader(path);
39+
this.reader = new GeoParquetReader(new Path(path));
3940
}
4041

4142
@Override

baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetGroup.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ public int getFieldRepetitionCount(int fieldIndex) {
105105
}
106106
}
107107

108-
private Object getValue(int fieldIndex, int index) {
108+
Object getValue(int fieldIndex, int index) {
109109
Object value = data[fieldIndex];
110110
if (value instanceof List<?>list) {
111111
return list.get(index);

baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetReader.java

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import com.fasterxml.jackson.databind.DeserializationFeature;
2121
import com.fasterxml.jackson.databind.ObjectMapper;
2222
import java.io.IOException;
23-
import java.net.URI;
2423
import java.util.*;
2524
import java.util.concurrent.atomic.AtomicLong;
2625
import java.util.stream.Collectors;
@@ -53,31 +52,31 @@ public class GeoParquetReader {
5352
/**
5453
* Constructs a new {@code GeoParquetReader}.
5554
*
56-
* @param uri the URI to read from
55+
* @param path the path to read from
5756
*/
58-
public GeoParquetReader(URI uri) {
59-
this(uri, null, new Configuration());
57+
public GeoParquetReader(Path path) {
58+
this(path, null, new Configuration());
6059
}
6160

6261
/**
6362
* Constructs a new {@code GeoParquetReader}.
6463
*
65-
* @param uri the URI to read from
64+
* @param path the path to read from
6665
* @param envelope the envelope to filter records
6766
*/
68-
public GeoParquetReader(URI uri, Envelope envelope) {
69-
this(uri, envelope, new Configuration());
67+
public GeoParquetReader(Path path, Envelope envelope) {
68+
this(path, envelope, new Configuration());
7069
}
7170

7271
/**
7372
* Constructs a new {@code GeoParquetReader}.
7473
*
75-
* @param uri the URI to read from
74+
* @param path the path to read from
7675
* @param configuration the configuration
7776
*/
78-
public GeoParquetReader(URI uri, Envelope envelope, Configuration configuration) {
77+
public GeoParquetReader(Path path, Envelope envelope, Configuration configuration) {
7978
this.configuration = configuration;
80-
this.files = initializeFiles(uri, configuration);
79+
this.files = initializeFiles(path, configuration);
8180
this.envelope = envelope;
8281
}
8382

@@ -168,11 +167,10 @@ private FileInfo getFileInfo(FileStatus fileStatus) {
168167
}
169168
}
170169

171-
private static List<FileStatus> initializeFiles(URI uri, Configuration configuration) {
170+
private static List<FileStatus> initializeFiles(Path path, Configuration configuration) {
172171
try {
173-
Path globPath = new Path(uri.getPath());
174-
FileSystem fileSystem = FileSystem.get(uri, configuration);
175-
FileStatus[] fileStatuses = fileSystem.globStatus(globPath);
172+
FileSystem fileSystem = FileSystem.get(path.toUri(), configuration);
173+
FileStatus[] fileStatuses = fileSystem.globStatus(path);
176174
if (fileStatuses == null) {
177175
throw new GeoParquetException("No files found at the specified URI.");
178176
}
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to you under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.baremaps.geoparquet;
19+
20+
import com.fasterxml.jackson.core.JsonProcessingException;
21+
import com.fasterxml.jackson.databind.ObjectMapper;
22+
import java.util.HashMap;
23+
import java.util.Map;
24+
import org.apache.hadoop.conf.Configuration;
25+
import org.apache.parquet.hadoop.api.WriteSupport;
26+
import org.apache.parquet.io.api.Binary;
27+
import org.apache.parquet.io.api.RecordConsumer;
28+
import org.apache.parquet.schema.*;
29+
30+
/**
31+
* WriteSupport implementation for writing GeoParquetGroup instances to Parquet.
32+
*/
33+
public class GeoParquetWriteSupport extends WriteSupport<GeoParquetGroup> {
34+
35+
private Configuration configuration;
36+
private final MessageType schema;
37+
private final GeoParquetMetadata metadata;
38+
private RecordConsumer recordConsumer;
39+
private final ObjectMapper objectMapper = new ObjectMapper();
40+
41+
/**
42+
* Constructs a new GeoParquetWriteSupport.
43+
*
44+
* @param schema the Parquet schema
45+
* @param metadata the GeoParquet metadata
46+
*/
47+
public GeoParquetWriteSupport(MessageType schema, GeoParquetMetadata metadata) {
48+
this.schema = schema;
49+
this.metadata = metadata;
50+
}
51+
52+
@Override
53+
public WriteContext init(Configuration configuration) {
54+
Map<String, String> extraMetadata = new HashMap<>();
55+
String geoMetadataJson = serializeMetadata(metadata);
56+
extraMetadata.put("geo", geoMetadataJson);
57+
return new WriteContext(schema, extraMetadata);
58+
}
59+
60+
@Override
61+
public void prepareForWrite(RecordConsumer recordConsumer) {
62+
this.recordConsumer = recordConsumer;
63+
}
64+
65+
@Override
66+
public void write(GeoParquetGroup group) {
67+
recordConsumer.startMessage();
68+
writeGroup(group, schema, true);
69+
recordConsumer.endMessage();
70+
}
71+
72+
private void writeGroup(GeoParquetGroup group, GroupType groupType, boolean isRoot) {
73+
if (!isRoot) {
74+
recordConsumer.startGroup();
75+
}
76+
for (int i = 0; i < groupType.getFieldCount(); i++) {
77+
Type fieldType = groupType.getType(i);
78+
String fieldName = fieldType.getName();
79+
int repetitionCount = group.getFieldRepetitionCount(i);
80+
if (repetitionCount == 0) {
81+
continue; // Skip if no values are present
82+
}
83+
for (int j = 0; j < repetitionCount; j++) {
84+
recordConsumer.startField(fieldName, i);
85+
if (fieldType.isPrimitive()) {
86+
Object value = group.getValue(i, j);
87+
writePrimitive(value, fieldType.asPrimitiveType());
88+
} else {
89+
GeoParquetGroup childGroup = group.getGroup(i, j);
90+
writeGroup(childGroup, fieldType.asGroupType(), false);
91+
}
92+
recordConsumer.endField(fieldName, i);
93+
}
94+
}
95+
if (!isRoot) {
96+
recordConsumer.endGroup();
97+
}
98+
}
99+
100+
private void writePrimitive(Object value, PrimitiveType primitiveType) {
101+
if (value == null) {
102+
// The Parquet format does not support writing null values directly.
103+
// If the field is optional and the value is null, we simply do not write it.
104+
return;
105+
}
106+
switch (primitiveType.getPrimitiveTypeName()) {
107+
case INT32:
108+
recordConsumer.addInteger((Integer) value);
109+
break;
110+
case INT64:
111+
recordConsumer.addLong((Long) value);
112+
break;
113+
case FLOAT:
114+
recordConsumer.addFloat((Float) value);
115+
break;
116+
case DOUBLE:
117+
recordConsumer.addDouble((Double) value);
118+
break;
119+
case BOOLEAN:
120+
recordConsumer.addBoolean((Boolean) value);
121+
break;
122+
case BINARY, FIXED_LEN_BYTE_ARRAY:
123+
recordConsumer.addBinary((Binary) value);
124+
break;
125+
default:
126+
throw new GeoParquetException(
127+
"Unsupported type: " + primitiveType.getPrimitiveTypeName());
128+
}
129+
}
130+
131+
private String serializeMetadata(GeoParquetMetadata metadata) {
132+
try {
133+
return objectMapper.writeValueAsString(metadata);
134+
} catch (JsonProcessingException e) {
135+
throw new GeoParquetException("Failed to serialize GeoParquet metadata", e);
136+
}
137+
}
138+
}
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to you under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.baremaps.geoparquet;
19+
20+
import org.apache.hadoop.conf.Configuration;
21+
import org.apache.hadoop.fs.Path;
22+
import org.apache.parquet.conf.ParquetConfiguration;
23+
import org.apache.parquet.hadoop.ParquetWriter;
24+
import org.apache.parquet.hadoop.api.WriteSupport;
25+
import org.apache.parquet.schema.MessageType;
26+
27+
/**
28+
* A writer for GeoParquet files that writes GeoParquetGroup instances to a Parquet file.
29+
*/
30+
public class GeoParquetWriter {
31+
32+
private GeoParquetWriter() {
33+
// Prevent instantiation
34+
}
35+
36+
public static Builder builder(Path file) {
37+
return new Builder(file);
38+
}
39+
40+
public static class Builder
41+
extends ParquetWriter.Builder<GeoParquetGroup, GeoParquetWriter.Builder> {
42+
43+
private MessageType type = null;
44+
45+
private GeoParquetMetadata metadata = null;
46+
47+
private Builder(Path file) {
48+
super(file);
49+
}
50+
51+
/**
52+
* Replace the message type with the specified one.
53+
*
54+
* @param type the message type
55+
* @return the builder
56+
*/
57+
public GeoParquetWriter.Builder withType(MessageType type) {
58+
this.type = type;
59+
return this;
60+
}
61+
62+
/**
63+
* Replace the metadata with the specified one.
64+
*
65+
* @param metadata the metadata
66+
* @return the builder
67+
*/
68+
public GeoParquetWriter.Builder withGeoParquetMetadata(GeoParquetMetadata metadata) {
69+
this.metadata = metadata;
70+
return this;
71+
}
72+
73+
/**
74+
* {@inheritDoc}
75+
*/
76+
@Override
77+
protected WriteSupport<GeoParquetGroup> getWriteSupport(Configuration conf) {
78+
// We don't need access to the hadoop configuration for now
79+
return getWriteSupport((ParquetConfiguration) null);
80+
}
81+
82+
/**
83+
* {@inheritDoc}
84+
*/
85+
@Override
86+
protected WriteSupport<GeoParquetGroup> getWriteSupport(ParquetConfiguration conf) {
87+
return new GeoParquetWriteSupport(type, metadata);
88+
}
89+
90+
/**
91+
* {@inheritDoc}
92+
*/
93+
@Override
94+
protected GeoParquetWriter.Builder self() {
95+
return this;
96+
}
97+
}
98+
}

0 commit comments

Comments
 (0)