Skip to content

Commit 596a78b

Browse files
committed
fixed it gis deserializer
1 parent 3c31c5b commit 596a78b

File tree

4 files changed

+20
-8
lines changed

4 files changed

+20
-8
lines changed

docs/content.zh/docs/connectors/pipeline-connectors/postgres.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -621,11 +621,11 @@ PostgreSQL 通过 PostGIS 扩展支持空间数据类型:
621621
<tbody>
622622
<tr>
623623
<td>GEOMETRY(POINT, xx)</td>
624-
<td>{"coordinates":[{"x":174.9479,"y":-36.7208,"z":"NaN","m":"NaN","valid":true}],"type":"Point","srid":3187}"</td>
624+
<td>{"coordinates":"[[174.9479, -36.7208]]","type":"Point","srid":3187}"</td>
625625
</tr>
626626
<tr>
627627
<td>GEOGRAPHY(MULTILINESTRING)</td>
628-
<td>{"coordinates":[{"x":169.1321,"y":-44.7032,"z":"NaN","m":"NaN","valid":true},{"x":167.8974,"y":-44.6414,"z":"NaN","m":"NaN","valid":true}],"type":"MultiLineString","srid":4326}</td>
628+
<td>{"coordinates":"[[169.1321, -44.7032],[167.8974, -44.6414]]","type":"MultiLineString","srid":4326}</td>
629629
</tr>
630630
</tbody>
631631
</table>

docs/content/docs/connectors/pipeline-connectors/postgres.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -616,11 +616,11 @@ The former is used for small-area planar data, while the latter is used for larg
616616
<tbody>
617617
<tr>
618618
<td>GEOMETRY(POINT, xx)</td>
619-
<td>{"coordinates":[{"x":174.9479,"y":-36.7208,"z":"NaN","m":"NaN","valid":true}],"type":"Point","srid":3187}"</td>
619+
<td>{"coordinates":"[[174.9479, -36.7208]]","type":"Point","srid":3187}"</td>
620620
</tr>
621621
<tr>
622622
<td>GEOGRAPHY(MULTILINESTRING)</td>
623-
<td>{"coordinates":[{"x":169.1321,"y":-44.7032,"z":"NaN","m":"NaN","valid":true},{"x":167.8974,"y":-44.6414,"z":"NaN","m":"NaN","valid":true}],"type":"MultiLineString","srid":4326}</td>
623+
<td>{"coordinates":"[[169.1321, -44.7032],[167.8974, -44.6414]]","type":"MultiLineString","srid":4326}</td>
624624
</tr>
625625
</tbody>
626626
</table>

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresEventDeserializer.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,10 @@
3434
import org.apache.kafka.connect.data.Schema;
3535
import org.apache.kafka.connect.data.Struct;
3636
import org.apache.kafka.connect.source.SourceRecord;
37+
import org.locationtech.jts.geom.Coordinate;
3738
import org.locationtech.jts.io.WKBReader;
3839

40+
import java.util.ArrayList;
3941
import java.util.Collections;
4042
import java.util.HashMap;
4143
import java.util.List;
@@ -128,7 +130,17 @@ protected Object convertToString(Object dbzObj, Schema schema) {
128130
if (geometryType.equals("GeometryCollection")) {
129131
geometryInfo.put("geometries", jtsGeom.toText());
130132
} else {
131-
geometryInfo.put("coordinates", jtsGeom.getCoordinates());
133+
Coordinate[] coordinates = jtsGeom.getCoordinates();
134+
List<double[]> coordinateList = new ArrayList<>();
135+
if (coordinates != null) {
136+
for (Coordinate coordinate : coordinates) {
137+
coordinateList.add(new double[] {coordinate.x, coordinate.y});
138+
geometryInfo.put(
139+
"coordinates", new double[] {coordinate.x, coordinate.y});
140+
}
141+
}
142+
geometryInfo.put(
143+
"coordinates", OBJECT_MAPPER.writeValueAsString(coordinateList));
132144
}
133145
geometryInfo.put("srid", srid.orElse(0));
134146
return BinaryStringData.fromString(OBJECT_MAPPER.writeValueAsString(geometryInfo));

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresFullTypesITCase.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -201,9 +201,9 @@ public void testFullTypes() throws Exception {
201201
64822000,
202202
DecimalData.fromBigDecimal(new BigDecimal("500"), 10, 0),
203203
BinaryStringData.fromString(
204-
"{\"coordinates\":[{\"x\":174.9479,\"y\":-36.7208,\"z\":\"NaN\",\"valid\":true,\"m\":\"NaN\"}],\"type\":\"Point\",\"srid\":3187}"),
204+
"{\"coordinates\":\"[[174.9479,-36.7208]]\",\"type\":\"Point\",\"srid\":3187}"),
205205
BinaryStringData.fromString(
206-
"{\"coordinates\":[{\"x\":169.1321,\"y\":-44.7032,\"z\":\"NaN\",\"m\":\"NaN\",\"valid\":true},{\"x\":167.8974,\"y\":-44.6414,\"z\":\"NaN\",\"m\":\"NaN\",\"valid\":true}],\"type\":\"MultiLineString\",\"srid\":4326}"),
206+
"{\"coordinates\":\"[[169.1321,-44.7032],[167.8974,-44.6414]]\",\"type\":\"MultiLineString\",\"srid\":4326}"),
207207
true,
208208
new byte[] {10},
209209
new byte[] {42},
@@ -224,7 +224,7 @@ public void testFullTypes() throws Exception {
224224
+ "</preferences>\n"
225225
+ "</user>"),
226226
BinaryStringData.fromString(
227-
"{\"coordinates\":[{\"x\":3.456,\"y\":7.89,\"z\":\"NaN\",\"m\":\"NaN\",\"valid\":true}],\"type\":\"Point\",\"srid\":0}"),
227+
"{\"coordinates\":\"[[3.456,7.89]]\",\"type\":\"Point\",\"srid\":0}"),
228228
BinaryStringData.fromString("foo.bar.baz"),
229229
BinaryStringData.fromString("JohnDoe"),
230230
BinaryStringData.fromString("{\"size\":\"L\",\"color\":\"blue\"}"),

0 commit comments

Comments
 (0)