Skip to content

Commit 39a8abf

Browse files
authored
Add a csv data store (#901)
* Add a csv datastore * Add tests that parses csv files and a sample geonames file * Format the source code * Fix issues identified by codeql and sonar
1 parent 36584f4 commit 39a8abf

File tree

5 files changed

+623
-0
lines changed

5 files changed

+623
-0
lines changed
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to you under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.baremaps.storage.csv;
19+
20+
import java.io.File;
21+
import java.io.IOException;
22+
import java.util.Collections;
23+
import java.util.List;
24+
import org.apache.baremaps.data.storage.DataSchema;
25+
import org.apache.baremaps.data.storage.DataStore;
26+
import org.apache.baremaps.data.storage.DataStoreException;
27+
import org.apache.baremaps.data.storage.DataTable;
28+
29+
/**
30+
* A DataStore implementation that manages a single CSV file.
31+
*/
32+
public class CsvDataStore implements DataStore {
33+
34+
private final String tableName;
35+
private final DataSchema schema;
36+
private final CsvDataTable dataTable;
37+
38+
/**
39+
* Constructs a CsvDataStore with the specified table name, schema, and CSV file.
40+
*
41+
* @param tableName the name of the table
42+
* @param schema the data schema defining the structure
43+
* @param csvFile the CSV file to read data from
44+
* @param hasHeader whether the CSV file includes a header row
45+
* @param separator the character used to separate columns in the CSV file
46+
* @throws IOException if an I/O error occurs
47+
*/
48+
public CsvDataStore(String tableName, DataSchema schema, File csvFile, boolean hasHeader,
49+
char separator) throws IOException {
50+
this.tableName = tableName;
51+
this.schema = schema;
52+
this.dataTable = new CsvDataTable(schema, csvFile, hasHeader, separator);
53+
}
54+
55+
/**
56+
* {@inheritDoc}
57+
*/
58+
@Override
59+
public List<String> list() throws DataStoreException {
60+
return Collections.singletonList(tableName);
61+
}
62+
63+
/**
64+
* {@inheritDoc}
65+
*/
66+
@Override
67+
public DataTable get(String name) throws DataStoreException {
68+
if (this.tableName.equals(name)) {
69+
return dataTable;
70+
} else {
71+
throw new DataStoreException("Table '" + name + "' not found.");
72+
}
73+
}
74+
75+
/**
76+
* {@inheritDoc}
77+
*/
78+
@Override
79+
public void add(DataTable table) throws DataStoreException {
80+
throw new UnsupportedOperationException("Adding tables is not supported in CsvDataStore.");
81+
}
82+
83+
/**
84+
* {@inheritDoc}
85+
*/
86+
@Override
87+
public void add(String name, DataTable table) throws DataStoreException {
88+
throw new UnsupportedOperationException("Adding tables is not supported in CsvDataStore.");
89+
}
90+
91+
/**
92+
* {@inheritDoc}
93+
*/
94+
@Override
95+
public void remove(String name) throws DataStoreException {
96+
throw new UnsupportedOperationException("Removing tables is not supported in CsvDataStore.");
97+
}
98+
}
Lines changed: 201 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to you under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.baremaps.storage.csv;
19+
20+
import com.fasterxml.jackson.core.JsonParser;
21+
import com.fasterxml.jackson.core.JsonToken;
22+
import com.fasterxml.jackson.dataformat.csv.CsvMapper;
23+
import com.fasterxml.jackson.dataformat.csv.CsvSchema;
24+
import java.io.File;
25+
import java.io.IOException;
26+
import java.util.*;
27+
import java.util.stream.Stream;
28+
import java.util.stream.StreamSupport;
29+
import org.apache.baremaps.data.storage.*;
30+
import org.locationtech.jts.io.WKTReader;
31+
32+
/**
33+
* A DataTable implementation that reads data from a CSV file using Jackson.
34+
*/
35+
public class CsvDataTable implements DataTable {
36+
37+
private final DataSchema schema;
38+
private final File csvFile;
39+
private final CsvSchema csvSchema;
40+
private final long size;
41+
42+
/**
43+
* Constructs a CsvDataTable with the specified schema, CSV file, header presence, and separator.
44+
*
45+
* @param schema the data schema defining the structure
46+
* @param csvFile the CSV file to read data from
47+
* @param hasHeader whether the CSV file includes a header row
48+
* @param separator the character used to separate columns in the CSV file
49+
* @throws IOException if an I/O error occurs
50+
*/
51+
public CsvDataTable(DataSchema schema, File csvFile, boolean hasHeader, char separator)
52+
throws IOException {
53+
this.schema = schema;
54+
this.csvFile = csvFile;
55+
this.csvSchema = buildCsvSchema(schema, hasHeader, separator);
56+
this.size = calculateSize();
57+
}
58+
59+
/**
60+
* Builds the CsvSchema for Jackson based on the provided DataSchema, header presence, and
61+
* separator.
62+
*
63+
* @param dataSchema the data schema
64+
* @param hasHeader whether the CSV file includes a header row
65+
* @param separator the character used to separate columns
66+
* @return the CsvSchema for Jackson
67+
*/
68+
private CsvSchema buildCsvSchema(DataSchema dataSchema, boolean hasHeader, char separator) {
69+
CsvSchema.Builder builder = CsvSchema.builder();
70+
for (DataColumn column : dataSchema.columns()) {
71+
builder.addColumn(column.name());
72+
}
73+
return builder.setUseHeader(hasHeader).setColumnSeparator(separator).build();
74+
}
75+
76+
/**
77+
* Calculates the number of rows in the CSV file.
78+
*
79+
* @return the number of rows
80+
* @throws IOException if an I/O error occurs
81+
*/
82+
private long calculateSize() throws IOException {
83+
try (var parser = new CsvMapper().readerFor(Map.class)
84+
.with(csvSchema)
85+
.createParser(csvFile)) {
86+
long rowCount = 0;
87+
while (parser.nextToken() != null) {
88+
if (parser.currentToken() == JsonToken.START_OBJECT) {
89+
rowCount++;
90+
}
91+
}
92+
return rowCount;
93+
}
94+
}
95+
96+
@Override
97+
public DataSchema schema() {
98+
return schema;
99+
}
100+
101+
@Override
102+
public boolean add(DataRow row) {
103+
throw new UnsupportedOperationException("Adding rows is not supported.");
104+
}
105+
106+
@Override
107+
public void clear() {
108+
throw new UnsupportedOperationException("Clearing rows is not supported.");
109+
}
110+
111+
@Override
112+
public long size() {
113+
return size;
114+
}
115+
116+
@Override
117+
public Iterator<DataRow> iterator() {
118+
try {
119+
CsvMapper csvMapper = new CsvMapper();
120+
JsonParser parser = csvMapper.readerFor(Map.class)
121+
.with(csvSchema)
122+
.createParser(csvFile);
123+
124+
Iterator<Map<String, String>> csvIterator = csvMapper.readerFor(Map.class)
125+
.with(csvSchema)
126+
.readValues(parser);
127+
128+
return new Iterator<>() {
129+
@Override
130+
public boolean hasNext() {
131+
return csvIterator.hasNext();
132+
}
133+
134+
@Override
135+
public DataRow next() {
136+
Map<String, String> csvRow = csvIterator.next();
137+
DataRow dataRow = schema.createRow();
138+
139+
for (int i = 0; i < schema.columns().size(); i++) {
140+
DataColumn column = schema.columns().get(i);
141+
String columnName = column.name();
142+
String value = csvRow.get(columnName);
143+
144+
if (value != null) {
145+
Object parsedValue = parseValue(column, value);
146+
dataRow.set(i, parsedValue);
147+
} else {
148+
dataRow.set(i, null);
149+
}
150+
}
151+
return dataRow;
152+
}
153+
};
154+
155+
} catch (IOException e) {
156+
throw new DataStoreException("Error reading CSV file", e);
157+
}
158+
}
159+
160+
/**
161+
* Parses the string value from the CSV according to the column type.
162+
*
163+
* @param column the data column
164+
* @param value the string value from the CSV
165+
* @return the parsed value
166+
*/
167+
private Object parseValue(DataColumn column, String value) {
168+
DataColumn.Type type = column.type();
169+
try {
170+
if (value == null || value.isEmpty()) {
171+
return null;
172+
}
173+
return switch (type) {
174+
case STRING -> value;
175+
case INTEGER -> Integer.parseInt(value);
176+
case LONG -> Long.parseLong(value);
177+
case FLOAT -> Float.parseFloat(value);
178+
case DOUBLE -> Double.parseDouble(value);
179+
case BOOLEAN -> Boolean.parseBoolean(value);
180+
case GEOMETRY, POINT, LINESTRING, POLYGON, MULTIPOINT, MULTILINESTRING, MULTIPOLYGON,
181+
GEOMETRYCOLLECTION -> {
182+
WKTReader reader = new WKTReader();
183+
yield reader.read(value);
184+
}
185+
default -> throw new IllegalArgumentException("Unsupported column type: " + type);
186+
};
187+
} catch (Exception e) {
188+
throw new DataStoreException("Error parsing value for column " + column.name(), e);
189+
}
190+
}
191+
192+
@Override
193+
public Spliterator<DataRow> spliterator() {
194+
return Spliterators.spliteratorUnknownSize(iterator(), Spliterator.ORDERED);
195+
}
196+
197+
@Override
198+
public Stream<DataRow> stream() {
199+
return StreamSupport.stream(spliterator(), false);
200+
}
201+
}

0 commit comments

Comments
 (0)