Skip to content

[FLINK-37959][postgres]Supports all pgsql field types like debezium #4086

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
316 changes: 268 additions & 48 deletions docs/content.zh/docs/connectors/pipeline-connectors/postgres.md
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,6 @@ pipeline:

## 数据类型映射


<div class="wy-table-responsive">
<table class="colwidths-auto docutils">
<thead>
Expand All @@ -300,6 +299,17 @@ pipeline:
</tr>
</thead>
<tbody>
<tr>
<td>
BOOLEAN <br>
BIT(1) <br>
<td>BOOLEAN</td>
</tr>
<tr>
<td>
BIT( > 1)
<td>BYTES</td>
</tr>
<tr>
<td>
SMALLINT<br>
Expand All @@ -317,79 +327,289 @@ pipeline:
<tr>
<td>
BIGINT<br>
BIGSERIAL</td>
<td>BIGINT</td>
</tr>
<tr>
<td>NUMERIC</td>
<td>DECIMAL(20, 0)</td>
</tr>
<tr>
<td>BIGINT</td>
BIGSERIAL<br>
OID<br>
</td>
<td>BIGINT</td>
</tr>
<tr>
<td>
REAL<br>
FLOAT4</td>
FLOAT4
</td>
<td>FLOAT</td>
</tr>
<tr>
<td>
FLOAT8<br>
DOUBLE PRECISION</td>
<td>DOUBLE</td>
</tr>
<tr>
<td>
NUMERIC(p, s)<br>
DECIMAL(p, s)</td>
<td>DECIMAL(p, s)</td>
<tr>
<td>NUMERIC</td>
<td>DECIMAL(38, 0)</td>
</tr>
<tr>
<td>BOOLEAN</td>
<td>BOOLEAN</td>
<td>DOUBLE PRECISION<br>
FLOAT8
</td>
<td>DOUBLE</td>
</tr>
<tr>
<td> CHAR[(M)]<br>
VARCHAR[(M)]<br>
CHARACTER[(M)]<br>
BPCHAR[(M)]<br>
CHARACTER VARYING[(M)]
</td>
<td>STRING</td>
</tr>
<tr>
<td>DATE</td>
<td>DATE</td>
<td>TIMESTAMPTZ<br>
TIMESTAMP WITH TIME ZONE</td>
<td>ZonedTimestampType</td>
</tr>
<tr>
<td>TIME [(p)] [WITHOUT TIMEZONE]</td>
<td>TIME [(p)] [WITHOUT TIMEZONE]</td>
<td>INTERVAL [P]</td>
<td>BIGINT</td>
</tr>
<tr>
<td>TIMESTAMP [(p)] [WITHOUT TIMEZONE]</td>
<td>TIMESTAMP [(p)] [WITHOUT TIMEZONE]</td>
<td>INTERVAL [P]</td>
<td>STRING(when <code>debezium.interval.handling.mode</code> is set to string)</td>
</tr>
<tr>
<td>
CHAR(n)<br>
CHARACTER(n)<br>
VARCHAR(n)<br>
CHARACTER VARYING(n)</td>
<td>CHAR(n)</td>
<td>BYTEA</td>
<td>BYTES or STRING (when <code>debezium.binary.handling.mode</code> is set to base64 or base64-url-safe or hex)</td>
</tr>
<tr>
<td>
TEXT</td>
JSON<br>
JSONB<br>
XML<br>
UUID<br>
POINT<br>
LTREE<br>
CITEXT<br>
INET<br>
INT4RANGE<br>
INT8RANGE<br>
NUMRANGE<br>
TSRANGE<br>
DATERANGE<br>
ENUM
</td>
<td>STRING</td>
</tr>
<tr>
<td>BYTEA</td>
<td>BYTES</td>
</tr>
</tbody>
</table>
</div>

### 空间数据类型映射
PostgreSQL通过PostGIS扩展支持空间数据类型:
### Temporal types Mapping
除了包含时区信息的 PostgreSQL 的 TIMESTAMPTZ 数据类型之外,其他时间类型如何映射取决于连接器配置属性 time.precision.mode 的值。以下各节将描述这些映射关系:
time.precision.mode=adaptive

time.precision.mode=adaptive_time_microseconds

time.precision.mode=connect


当 time.precision.mode 属性设置为默认的 adaptive(自适应)时,连接器会根据列的数据类型定义来确定字面类型和语义类型。这可以确保事件能够精确地表示数据库中的值。
<div class="wy-table-responsive">
<table class="colwidths-auto docutils">
<thead>
<tr>
<th class="text-left">PostgreSQL type<a href="https://www.postgresql.org/docs/12/datatype.html"></a></th>
<th class="text-left">CDC type<a href="{% link dev/table/types.md %}"></a></th>
</tr>
</thead>
<tbody>
<tr>
<td>
DATE
<td>DATE</td>
</tr>
<tr>
<td>
TIME([P])
</td>
<td>TIME([P])</td>
</tr>
<tr>
<td>
TIMESTAMP([P])
</td>
<td>TIMESTAMP([P])</td>
</tr>
</tbody>
</table>
</div>

### Decimal types Mapping
PostgreSQL 连接器配置属性 <code>debezium.decimal.handling.mode</code> 的设置决定了连接器如何映射十进制类型。

当 <code>debezium.decimal.handling.mode</code> 属性设置为 precise(精确)时,连接器会对所有 DECIMAL、NUMERIC 和 MONEY 列使用 Kafka Connect 的 org.apache.kafka.connect.data.Decimal 逻辑类型。这是默认模式。
<div class="wy-table-responsive">
<table class="colwidths-auto docutils">
<thead>
<tr>
<th class="text-left">PostgreSQL type<a href="https://www.postgresql.org/docs/12/datatype.html"></a></th>
<th class="text-left">CDC type<a href="{% link dev/table/types.md %}"></a></th>
</tr>
</thead>
<tbody>
<tr>
<td>
NUMERIC[(M[,D])]
<td>DECIMAL[(M[,D])]</td>
</tr>
<tr>
<td>
NUMERIC
<td>DECIMAL(38,0)</td>
</tr>
<tr>
<td>
DECIMAL[(M[,D])]
<td>DECIMAL[(M[,D])]</td>
</tr>
<tr>
<td>
DECIMAL
<td>DECIMAL(38,0)</td>
</tr>
<tr>
<td>
MONEY[(M[,D])]
<td>DECIMAL(38,digits)(schema 参数 scale 包含一个整数,表示小数点移动了多少位。scale schema 参数由 money.fraction.digits 连接器配置属性决定。)</td>
</tr>
</tbody>
</table>
</div>

当 <code>debezium.decimal.handling.mode</code> 属性设置为 double 时,连接器将所有 DECIMAL、NUMERIC 和 MONEY 值表示为 Java 的 double 值,并按照下表所示进行编码。

<div class="wy-table-responsive">
<table class="colwidths-auto docutils">
<thead>
<tr>
<th class="text-left">PostgreSQL type<a href="https://www.postgresql.org/docs/12/datatype.html"></a></th>
<th class="text-left">CDC type<a href="{% link dev/table/types.md %}"></a></th>
</tr>
</thead>
<tbody>
<tr>
<td>
NUMERIC[(M[,D])]
<td>DOUBLE</td>
</tr>
<tr>
<td>
DECIMAL[(M[,D])]
<td>DOUBLE</td>
</tr>
<tr>
<td>
MONEY[(M[,D])]
<td>DOUBLE</td>
</tr>
</tbody>
</table>
</div>

GEOMETRY(POINT, xx):表示使用笛卡尔坐标系的点,EPSG:xxx定义其坐标系统,适用于局部平面计算。
GEOGRAPHY(MULTILINESTRING):以经纬度存储多条线串,基于球面模型,适合全球范围的空间分析。
<code>debezium.decimal.handling.mode</code> 配置属性的最后一个可选设置是 string(字符串)。在这种情况下,连接器将 DECIMAL、NUMERIC 和 MONEY 值表示为其格式化的字符串形式,并按照下表所示进行编码。
<div class="wy-table-responsive">
<table class="colwidths-auto docutils">
<thead>
<tr>
<th class="text-left">PostgreSQL type<a href="https://www.postgresql.org/docs/12/datatype.html"></a></th>
<th class="text-left">CDC type<a href="{% link dev/table/types.md %}"></a></th>
</tr>
</thead>
<tbody>
<tr>
<td>
NUMERIC[(M[,D])]
<td>STRING</td>
</tr>
<tr>
<td>
DECIMAL[(M[,D])]
<td>STRING</td>
</tr>
<tr>
<td>
MONEY[(M[,D])]
<td>STRING</td>
</tr>
</tbody>
</table>
</div>

前者用于小范围平面数据,后者用于大范围、需考虑地球曲率的地理数据。
当 <code>debezium.decimal.handling.mode</code> 的设置为 string 或 double 时,PostgreSQL 支持将 NaN(非数字)作为一个特殊值存储在 DECIMAL/NUMERIC 值中。在这种情况下,连接器会将 NaN 编码为 Double.NaN 或字符串常量 NAN。

### HSTORE type Mapping
PostgreSQL 连接器配置属性 <code>debezium.hstore.handling.mode</code> 的设置决定了连接器如何映射 HSTORE 值。

当 <code>debezium.hstore.handling.mode</code> 属性设置为 json(默认值)时,连接器将 HSTORE 值表示为 JSON 值的字符串形式,并按照下表所示进行编码。当 <code>debezium.hstore.handling.mode</code> 属性设置为 map 时,连接器对 HSTORE 值使用 MAP 模式类型。
<div class="wy-table-responsive">
<table class="colwidths-auto docutils">
<thead>
<tr>
<th class="text-left">PostgreSQL type<a href="https://www.postgresql.org/docs/12/datatype.html"></a></th>
<th class="text-left">CDC type<a href="{% link dev/table/types.md %}"></a></th>
</tr>
</thead>
<tbody>
<tr>
<td>
HSTORE
<td>STRING(<code>`debezium.hstore.handling.mode`=`string`</code>)</td>
</tr>
<tr>
<td>
HSTORE
<td>MAP(<code>`debezium.hstore.handling.mode`=`map`</code>)</td>
</tr>
</tbody>
</table>
</div>

### Network address types Mapping
PostgreSQL 拥有可以存储 IPv4、IPv6 和 MAC 地址的数据类型。使用这些类型来存储网络地址比使用纯文本类型更为合适。网络地址类型提供了输入错误检查以及专用的操作符和函数。
<div class="wy-table-responsive">
<table class="colwidths-auto docutils">
<thead>
<tr>
<th class="text-left">PostgreSQL type<a href="https://www.postgresql.org/docs/12/datatype.html"></a></th>
<th class="text-left">CDC type<a href="{% link dev/table/types.md %}"></a></th>
</tr>
</thead>
<tbody>
<tr>
<td>
INET
<td>STRING</td>
</tr>
<tr>
<td>
CIDR
<td>STRING</td>
</tr>
<tr>
<td>
MACADDR
<td>STRING</td>
</tr>
<tr>
<td>
MACADDR8
<td>STRING</td>
</tr>
</tbody>
</table>
</div>

### PostGIS Types Mapping
PostgreSQL 通过 PostGIS 扩展支持空间数据类型:
```
GEOMETRY(POINT, xx): 在笛卡尔坐标系中表示一个点,其中 EPSG:xx 定义了坐标系。它适用于局部平面计算。
GEOGRAPHY(MULTILINESTRING): 在基于球面模型的纬度和经度上存储多条线串。它适用于全球范围的空间分析。
```
前者适用于小范围的平面数据,而后者适用于需要考虑地球曲率的大范围数据。
<div class="wy-table-responsive">
<table class="colwidths-auto docutils">
<thead>
Expand All @@ -401,11 +621,11 @@ PostgreSQL通过PostGIS扩展支持空间数据类型:
<tbody>
<tr>
<td>GEOMETRY(POINT, xx)</td>
<td>{"hexewkb":"0101000020730c00001c7c613255de6540787aa52c435c42c0","srid":3187}</td>
<td>{"coordinates":"[[174.9479, -36.7208]]","type":"Point","srid":3187}"</td>
</tr>
<tr>
<td>GEOGRAPHY(MULTILINESTRING)</td>
<td>{"hexewkb":"0105000020e610000001000000010200000002000000a779c7293a2465400b462575025a46c0c66d3480b7fc6440c3d32b65195246c0","srid":4326}</td>
<td>{"coordinates":"[[169.1321, -44.7032],[167.8974, -44.6414]]","type":"MultiLineString","srid":4326}</td>
</tr>
</tbody>
</table>
Expand Down
Loading
Loading