Skip to content

Commit 801753a

Browse files
committed
fixed it gis deserializer
1 parent 3c31c5b commit 801753a

File tree

4 files changed

+25
-8
lines changed

4 files changed

+25
-8
lines changed

docs/content.zh/docs/connectors/pipeline-connectors/postgres.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -621,11 +621,11 @@ PostgreSQL 通过 PostGIS 扩展支持空间数据类型:
621621
<tbody>
622622
<tr>
623623
<td>GEOMETRY(POINT, xx)</td>
624-
<td>{"coordinates":[{"x":174.9479,"y":-36.7208,"z":"NaN","m":"NaN","valid":true}],"type":"Point","srid":3187}"</td>
624+
<td>{"coordinates":"[174.9479, -36.7208]","type":"Point","srid":3187}"</td>
625625
</tr>
626626
<tr>
627627
<td>GEOGRAPHY(MULTILINESTRING)</td>
628-
<td>{"coordinates":[{"x":169.1321,"y":-44.7032,"z":"NaN","m":"NaN","valid":true},{"x":167.8974,"y":-44.6414,"z":"NaN","m":"NaN","valid":true}],"type":"MultiLineString","srid":4326}</td>
628+
<td>{"coordinates":"[169.1321, -44.7032],[167.8974, -44.6414]","type":"MultiLineString","srid":4326}</td>
629629
</tr>
630630
</tbody>
631631
</table>

docs/content/docs/connectors/pipeline-connectors/postgres.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -616,11 +616,11 @@ The former is used for small-area planar data, while the latter is used for larg
616616
<tbody>
617617
<tr>
618618
<td>GEOMETRY(POINT, xx)</td>
619-
<td>{"coordinates":[{"x":174.9479,"y":-36.7208,"z":"NaN","m":"NaN","valid":true}],"type":"Point","srid":3187}"</td>
619+
<td>{"coordinates":"[174.9479, -36.7208]","type":"Point","srid":3187}"</td>
620620
</tr>
621621
<tr>
622622
<td>GEOGRAPHY(MULTILINESTRING)</td>
623-
<td>{"coordinates":[{"x":169.1321,"y":-44.7032,"z":"NaN","m":"NaN","valid":true},{"x":167.8974,"y":-44.6414,"z":"NaN","m":"NaN","valid":true}],"type":"MultiLineString","srid":4326}</td>
623+
<td>{"coordinates":"[169.1321, -44.7032],[167.8974, -44.6414]","type":"MultiLineString","srid":4326}</td>
624624
</tr>
625625
</tbody>
626626
</table>

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresEventDeserializer.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,17 @@
3434
import org.apache.kafka.connect.data.Schema;
3535
import org.apache.kafka.connect.data.Struct;
3636
import org.apache.kafka.connect.source.SourceRecord;
37+
import org.locationtech.jts.geom.Coordinate;
3738
import org.locationtech.jts.io.WKBReader;
3839

40+
import java.util.ArrayList;
41+
import java.util.Arrays;
3942
import java.util.Collections;
4043
import java.util.HashMap;
4144
import java.util.List;
4245
import java.util.Map;
4346
import java.util.Optional;
47+
import java.util.stream.Collectors;
4448

4549
/** Event deserializer for {@link PostgresDataSource}. */
4650
@Internal
@@ -128,7 +132,20 @@ protected Object convertToString(Object dbzObj, Schema schema) {
128132
if (geometryType.equals("GeometryCollection")) {
129133
geometryInfo.put("geometries", jtsGeom.toText());
130134
} else {
131-
geometryInfo.put("coordinates", jtsGeom.getCoordinates());
135+
Coordinate[] coordinates = jtsGeom.getCoordinates();
136+
List<double[]> coordinateList = new ArrayList<>();
137+
if (coordinates != null) {
138+
for (Coordinate coordinate : coordinates) {
139+
coordinateList.add(new double[] {coordinate.x, coordinate.y});
140+
geometryInfo.put(
141+
"coordinates", new double[] {coordinate.x, coordinate.y});
142+
}
143+
}
144+
geometryInfo.put(
145+
"coordinates",
146+
coordinateList.stream()
147+
.map(Arrays::toString)
148+
.collect(Collectors.joining(",")));
132149
}
133150
geometryInfo.put("srid", srid.orElse(0));
134151
return BinaryStringData.fromString(OBJECT_MAPPER.writeValueAsString(geometryInfo));

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresFullTypesITCase.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -201,9 +201,9 @@ public void testFullTypes() throws Exception {
201201
64822000,
202202
DecimalData.fromBigDecimal(new BigDecimal("500"), 10, 0),
203203
BinaryStringData.fromString(
204-
"{\"coordinates\":[{\"x\":174.9479,\"y\":-36.7208,\"z\":\"NaN\",\"valid\":true,\"m\":\"NaN\"}],\"type\":\"Point\",\"srid\":3187}"),
204+
"{\"coordinates\":\"[174.9479, -36.7208]\",\"type\":\"Point\",\"srid\":3187}"),
205205
BinaryStringData.fromString(
206-
"{\"coordinates\":[{\"x\":169.1321,\"y\":-44.7032,\"z\":\"NaN\",\"m\":\"NaN\",\"valid\":true},{\"x\":167.8974,\"y\":-44.6414,\"z\":\"NaN\",\"m\":\"NaN\",\"valid\":true}],\"type\":\"MultiLineString\",\"srid\":4326}"),
206+
"{\"coordinates\":\"[169.1321, -44.7032],[167.8974, -44.6414]\",\"type\":\"MultiLineString\",\"srid\":4326}"),
207207
true,
208208
new byte[] {10},
209209
new byte[] {42},
@@ -224,7 +224,7 @@ public void testFullTypes() throws Exception {
224224
+ "</preferences>\n"
225225
+ "</user>"),
226226
BinaryStringData.fromString(
227-
"{\"coordinates\":[{\"x\":3.456,\"y\":7.89,\"z\":\"NaN\",\"m\":\"NaN\",\"valid\":true}],\"type\":\"Point\",\"srid\":0}"),
227+
"{\"coordinates\":\"[3.456, 7.89]\",\"type\":\"Point\",\"srid\":0}"),
228228
BinaryStringData.fromString("foo.bar.baz"),
229229
BinaryStringData.fromString("JohnDoe"),
230230
BinaryStringData.fromString("{\"size\":\"L\",\"color\":\"blue\"}"),

0 commit comments

Comments
 (0)