Skip to content

Commit 25d88b3

Browse files
committed
support all pg types
[FLINK-38141][pipeline-connector/iceberg] Fix iceberg connector incorrect type mapping (#4070) --------- Co-authored-by: zhangchao.doovvv <zhangchao.doovvv@bytedance.com> [minor] Fix potential sql connection statement issue (#4069) [FLINK-38185][pipeline-connector][iceberg] Correctly handle the type conversion of TIMESTAMP_TITH_TIME_ZONE. (#4074) [tests][pipeline-connector/fluss] Add MySQL to Fluss E2e IT case (#4057) * [ci][fluss] Add MySQL to Fluss E2e IT case Signed-off-by: yuxiqian <34335406+yuxiqian@users.noreply.github.com> * add: comments Signed-off-by: yuxiqian <34335406+yuxiqian@users.noreply.github.com> --------- Signed-off-by: yuxiqian <34335406+yuxiqian@users.noreply.github.com> [FLINK-38184] one time of GetCopyOfBuffer is enough When serializing split. (#4073) [FLINK-38164][pipeline-connector/mysql] support mysql long and long varchar type (#4076) --------- Co-authored-by: zhangchao.doovvv <zhangchao.doovvv@bytedance.com> [FLINK-37828] Enable scan.incremental.snapshot.unbounded-chunk-first by default for improved stability (#4082) [FLINK-38194][pipeline-connector/iceberg] Iceberg connector supports auto-creating namespace (#4080) --------- Co-authored-by: zhangchao.doovvv <zhangchao.doovvv@bytedance.com> [FLINK-37905][runtime] Fix transform failure with non-ascii string literals (#4038) * [FLINK-37905] Fix transform failure with non-ascii string literals * address comments Signed-off-by: yuxiqian <34335406+yuxiqian@users.noreply.github.com> --------- Signed-off-by: yuxiqian <34335406+yuxiqian@users.noreply.github.com> [FLINK-38142] Upgrading the Paimon version to 1.2.0 (#4066) [FLINK-37963] Fix potential NPE when triggering JobManager failover prematurely (#4044) fixed decimal add test fixed decimal mode test add all types [FLINK-38059][doc] Add fluss pipeline connector documentation (#4088) Co-authored-by: wangjunbo <wangjunbo@qiyi.com> [FLINK-37835] Fix NoPointException when start with latest-offset. (#4091) --------- Co-authored-by: wuzexian <shanqing.wzx@alibaba-inc.com> [FLINK-38188][pipeline-connector][postgres] Fix database name validation logic in PostgresDataSourceFactory (#4075) --------- Co-authored-by: lvyanquan <lvyanquan.lyq@alibaba-inc.com> [hotfix][doc] Update uses of Upload documentation in build_docs.yml. (#4093) [FLINK-38204][pipeline-connector][maxcompute] Use getLatestEvolvedSchema to get Schema in SessionManageOperator in case of using route. #4094 Co-authored-by: wuzexian <shanqing.wzx@alibaba-inc.com> back
1 parent 989de07 commit 25d88b3

File tree

66 files changed

+2137
-306
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

66 files changed

+2137
-306
lines changed

.github/workflows/build_docs.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ jobs:
8484
docker run --rm --volume "$PWD:/root/flink-cdc" chesnay/flink-ci:java_8_11_17_21_maven_386 bash -c "cd /root/flink-cdc && chmod +x ./.github/workflows/docs.sh && ./.github/workflows/docs.sh"
8585
8686
- name: Upload documentation
87-
uses: burnett01/rsync-deployments@5.2
87+
uses: burnett01/rsync-deployments@0dc935cdecc5f5e571865e60d2a6cdc673704823
8888
with:
8989
switches: --archive --compress --delete
9090
path: docs/target/
@@ -96,7 +96,7 @@ jobs:
9696

9797
- name: Upload documentation alias
9898
if: env.flink_alias != ''
99-
uses: burnett01/rsync-deployments@5.2
99+
uses: burnett01/rsync-deployments@0dc935cdecc5f5e571865e60d2a6cdc673704823
100100
with:
101101
switches: --archive --compress --delete
102102
path: docs/target/

docs/content.zh/docs/connectors/flink-sources/db2-cdc.md

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -267,12 +267,11 @@ Db2 server.
267267
<tr>
268268
<td>scan.incremental.snapshot.unbounded-chunk-first.enabled</td>
269269
<td>optional</td>
270-
<td style="word-wrap: break-word;">false</td>
270+
<td style="word-wrap: break-word;">true</td>
271271
<td>Boolean</td>
272272
<td>
273273
Whether to assign the unbounded chunks first during snapshot reading phase.<br>
274274
This might help reduce the risk of the TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of the largest unbounded chunk.<br>
275-
Experimental option, defaults to false.
276275
</td>
277276
</tr>
278277
</tbody>

docs/content.zh/docs/connectors/flink-sources/mongodb-cdc.md

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -328,12 +328,11 @@ MongoDB 的更改事件记录在消息之前没有更新。因此,我们只能
328328
<tr>
329329
<td>scan.incremental.snapshot.unbounded-chunk-first.enabled</td>
330330
<td>optional</td>
331-
<td style="word-wrap: break-word;">false</td>
331+
<td style="word-wrap: break-word;">true</td>
332332
<td>Boolean</td>
333333
<td>
334334
快照读取阶段是否先分配 UnboundedChunk。<br>
335335
这有助于降低 TaskManager 在快照阶段同步最后一个chunk时遇到内存溢出 (OOM) 的风险。<br>
336-
这是一项实验特性,默认为 false。
337336
</td>
338337
</tr>
339338
<tr>

docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -403,12 +403,11 @@ Flink SQL> SELECT * FROM orders;
403403
<tr>
404404
<td>scan.incremental.snapshot.unbounded-chunk-first.enabled</td>
405405
<td>optional</td>
406-
<td style="word-wrap: break-word;">false</td>
406+
<td style="word-wrap: break-word;">true</td>
407407
<td>Boolean</td>
408408
<td>
409409
快照读取阶段是否先分配 UnboundedChunk。<br>
410410
这有助于降低 TaskManager 在快照阶段同步最后一个chunk时遇到内存溢出 (OOM) 的风险。<br>
411-
这是一项实验特性,默认为 false。
412411
</td>
413412
</tr>
414413
<tr>

docs/content.zh/docs/connectors/flink-sources/oracle-cdc.md

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -428,12 +428,11 @@ Connector Options
428428
<tr>
429429
<td>scan.incremental.snapshot.unbounded-chunk-first.enabled</td>
430430
<td>optional</td>
431-
<td style="word-wrap: break-word;">false</td>
431+
<td style="word-wrap: break-word;">true</td>
432432
<td>Boolean</td>
433433
<td>
434434
Whether to assign the unbounded chunks first during snapshot reading phase.<br>
435435
This might help reduce the risk of the TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of the largest unbounded chunk.<br>
436-
Experimental option, defaults to false.
437436
</td>
438437
</tr>
439438
<tr>

docs/content.zh/docs/connectors/flink-sources/postgres-cdc.md

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -248,12 +248,11 @@ Connector Options
248248
<tr>
249249
<td>scan.incremental.snapshot.unbounded-chunk-first.enabled</td>
250250
<td>optional</td>
251-
<td style="word-wrap: break-word;">false</td>
251+
<td style="word-wrap: break-word;">true</td>
252252
<td>Boolean</td>
253253
<td>
254254
Whether to assign the unbounded chunks first during snapshot reading phase.<br>
255255
This might help reduce the risk of the TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of the largest unbounded chunk.<br>
256-
Experimental option, defaults to false.
257256
</td>
258257
</tr>
259258
<tr>

docs/content.zh/docs/connectors/flink-sources/sqlserver-cdc.md

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -244,12 +244,11 @@ Connector Options
244244
<tr>
245245
<td>scan.incremental.snapshot.unbounded-chunk-first.enabled</td>
246246
<td>optional</td>
247-
<td style="word-wrap: break-word;">false</td>
247+
<td style="word-wrap: break-word;">true</td>
248248
<td>Boolean</td>
249249
<td>
250250
Whether to assign the unbounded chunks first during snapshot reading phase.<br>
251251
This might help reduce the risk of the TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of the largest unbounded chunk.<br>
252-
Experimental option, defaults to false.
253252
</td>
254253
</tr>
255254
<tr>
Lines changed: 243 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,243 @@
1+
---
2+
title: "Fluss"
3+
weight: 4
4+
type: docs
5+
aliases:
6+
- /connectors/pipeline-connectors/fluss
7+
---
8+
<!--
9+
Licensed to the Apache Software Foundation (ASF) under one
10+
or more contributor license agreements. See the NOTICE file
11+
distributed with this work for additional information
12+
regarding copyright ownership. The ASF licenses this file
13+
to you under the Apache License, Version 2.0 (the
14+
"License"); you may not use this file except in compliance
15+
with the License. You may obtain a copy of the License at
16+
17+
http://www.apache.org/licenses/LICENSE-2.0
18+
19+
Unless required by applicable law or agreed to in writing,
20+
software distributed under the License is distributed on an
21+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
22+
KIND, either express or implied. See the License for the
23+
specific language governing permissions and limitations
24+
under the License.
25+
-->
26+
27+
# Fluss Pipeline 连接器
28+
Fluss Pipeline 连接器可用作 Pipeline 的 *Data Sink*,将数据写入 [Fluss](https://fluss.apache.org)。本文档介绍如何配置 Fluss Pipeline 连接器。
29+
30+
## What can the connector do?
31+
* 自动创建不存在的表
32+
* 数据同步
33+
34+
How to create Pipeline
35+
----------------
36+
37+
从 MySQL 读取数据并写入 Fluss 的 Pipeline 可定义如下:
38+
39+
```yaml
40+
source:
41+
type: mysql
42+
name: MySQL Source
43+
hostname: 127.0.0.1
44+
port: 3306
45+
username: admin
46+
password: pass
47+
tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
48+
server-id: 5401-5404
49+
50+
sink:
51+
type: fluss
52+
name: Fluss Sink
53+
bootstrap.servers: localhost:9123
54+
# Security-related properties for the Fluss client
55+
properties.client.security.protocol: sasl
56+
properties.client.security.sasl.mechanism: PLAIN
57+
properties.client.security.sasl.username: developer
58+
properties.client.security.sasl.password: developer-pass
59+
60+
pipeline:
61+
name: MySQL to Fluss Pipeline
62+
parallelism: 2
63+
```
64+
65+
Pipeline Connector Options
66+
----------------
67+
<div class="highlight">
68+
<table class="colwidths-auto docutils">
69+
<thead>
70+
<tr>
71+
<th class="text-left" style="width: 25%">Option</th>
72+
<th class="text-left" style="width: 8%">Required</th>
73+
<th class="text-left" style="width: 7%">Default</th>
74+
<th class="text-left" style="width: 10%">Type</th>
75+
<th class="text-left" style="width: 50%">Description</th>
76+
</tr>
77+
</thead>
78+
<tbody>
79+
<tr>
80+
<td>type</td>
81+
<td>required</td>
82+
<td style="word-wrap: break-word;">(none)</td>
83+
<td>String</td>
84+
<td>指定要使用的连接器, 这里需要设置成 <code>'fluss'</code>。 </td>
85+
</tr>
86+
<tr>
87+
<td>name</td>
88+
<td>optional</td>
89+
<td style="word-wrap: break-word;">(none)</td>
90+
<td>String</td>
91+
<td>Sink 的名称。 </td>
92+
</tr>
93+
<tr>
94+
<td>bootstrap.servers</td>
95+
<td>required</td>
96+
<td style="word-wrap: break-word;">(none)</td>
97+
<td>String</td>
98+
<td>用于建立与 Fluss 集群初始连接的主机/端口对列表。 </td>
99+
</tr>
100+
<tr>
101+
<td>bucket.key</td>
102+
<td>optional</td>
103+
<td style="word-wrap: break-word;">(none)</td>
104+
<td>String</td>
105+
<td>指定每个 Fluss 表的数据分布策略。表之间用 ';' 分隔,分桶键之间用 ',' 分隔。格式:database1.table1:key1,key2;database1.table2:key3。
106+
数据将根据分桶键的哈希值分配到各个桶中(分桶键必须是主键的子集,且不包含主键表的分区键)。
107+
若表有主键但未指定分桶键,则分桶键默认为主键(不含分区键);若表无主键且未指定分桶键,则数据将随机分配到各个桶中。 </td>
108+
</tr>
109+
<tr>
110+
<td>bucket.num</td>
111+
<td>optional</td>
112+
<td style="word-wrap: break-word;">(none)</td>
113+
<td>String</td>
114+
<td>每个 Fluss 表的桶数量。表之间用 ';' 分隔。格式:database1.table1:4;database1.table2:8。 </td>
115+
</tr>
116+
<tr>
117+
<td>properties.table.*</td>
118+
<td>optional</td>
119+
<td style="word-wrap: break-word;">(none)</td>
120+
<td>String</td>
121+
<td>将 Fluss table 支持的参数传递给 pipeline,参考,See <a href="https://fluss.apache.org/docs/engine-flink/options/#storage-options">Fluss table options</a>. </td>
122+
</tr>
123+
<tr>
124+
<td>properties.client.*</td>
125+
<td>optional</td>
126+
<td style="word-wrap: break-word;">(none)</td>
127+
<td>String</td>
128+
<td>将 Fluss client 支持的参数传递给 pipeline,See <a href="https://fluss.apache.org/docs/engine-flink/options/#write-options">Fluss client options</a>. </td>
129+
</tr>
130+
</tbody>
131+
</table>
132+
</div>
133+
134+
## 使用说明
135+
136+
* 支持 Fluss 主键表和日志表。
137+
138+
* 关于自动建表
139+
* 没有分区键
140+
* 桶数量由 `bucket.num` 选项控制
141+
* 数据分布由 `bucket.key` 选项控制。对于主键表,若未指定分桶键,则分桶键默认为主键(不含分区键);对于无主键的日志表,若未指定分桶键,则数据将随机分配到各个桶中。
142+
143+
* 不支持 schema 变更同步。如果需要忽略 schema 变更,可使用 `schema.change.behavior: IGNORE`。
144+
145+
* 关于数据同步, Pipeline 连接器使用 [Fluss Java Client](https://fluss.apache.org/docs/apis/java-client/) 向 Fluss 写入数据.
146+
147+
Data Type Mapping
148+
----------------
149+
<div class="wy-table-responsive">
150+
<table class="colwidths-auto docutils">
151+
<thead>
152+
<tr>
153+
<th class="text-left">Flink CDC type</th>
154+
<th class="text-left">Fluss type</th>
155+
<th class="text-left" style="width:60%;">Note</th>
156+
</tr>
157+
</thead>
158+
<tbody>
159+
<tr>
160+
<td>TINYINT</td>
161+
<td>TINYINT</td>
162+
<td></td>
163+
</tr>
164+
<tr>
165+
<td>SMALLINT</td>
166+
<td>SMALLINT</td>
167+
<td></td>
168+
</tr>
169+
<tr>
170+
<td>INT</td>
171+
<td>INT</td>
172+
<td></td>
173+
</tr>
174+
<tr>
175+
<td>BIGINT</td>
176+
<td>BIGINT</td>
177+
<td></td>
178+
</tr>
179+
<tr>
180+
<td>FLOAT</td>
181+
<td>FLOAT</td>
182+
<td></td>
183+
</tr>
184+
<tr>
185+
<td>DOUBLE</td>
186+
<td>DOUBLE</td>
187+
<td></td>
188+
</tr>
189+
<tr>
190+
<td>DECIMAL(p, s)</td>
191+
<td>DECIMAL(p, s)</td>
192+
<td></td>
193+
</tr>
194+
<tr>
195+
<td>BOOLEAN</td>
196+
<td>BOOLEAN</td>
197+
<td></td>
198+
</tr>
199+
<tr>
200+
<td>DATE</td>
201+
<td>DATE</td>
202+
<td></td>
203+
</tr>
204+
<tr>
205+
<td>TIME</td>
206+
<td>TIME</td>
207+
<td></td>
208+
</tr>
209+
<tr>
210+
<td>TIMESTAMP</td>
211+
<td>TIMESTAMP</td>
212+
<td></td>
213+
</tr>
214+
<tr>
215+
<td>TIMESTAMP_LTZ</td>
216+
<td>TIMESTAMP_LTZ</td>
217+
<td></td>
218+
</tr>
219+
<tr>
220+
<td>CHAR(n)</td>
221+
<td>CHAR(n)</td>
222+
<td></td>
223+
</tr>
224+
<tr>
225+
<td>VARCHAR(n)</td>
226+
<td>VARCHAR(n)</td>
227+
<td></td>
228+
</tr>
229+
<tr>
230+
<td>BINARY(n)</td>
231+
<td>BINARY(n)</td>
232+
<td></td>
233+
</tr>
234+
<tr>
235+
<td>VARBINARY(N)</td>
236+
<td>BYTES</td>
237+
<td></td>
238+
</tr>
239+
</tbody>
240+
</table>
241+
</div>
242+
243+
{{< top >}}

docs/content.zh/docs/connectors/pipeline-connectors/mysql.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -596,6 +596,8 @@ source:
596596
TEXT<br>
597597
MEDIUMTEXT<br>
598598
LONGTEXT<br>
599+
LONG<br>
600+
LONG VARCHAR<br>
599601
</td>
600602
<td>STRING</td>
601603
<td></td>

docs/content/docs/connectors/flink-sources/db2-cdc.md

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -267,12 +267,11 @@ Db2 server.
267267
<tr>
268268
<td>scan.incremental.snapshot.unbounded-chunk-first.enabled</td>
269269
<td>optional</td>
270-
<td style="word-wrap: break-word;">false</td>
270+
<td style="word-wrap: break-word;">true</td>
271271
<td>Boolean</td>
272272
<td>
273273
Whether to assign the unbounded chunks first during snapshot reading phase.<br>
274274
This might help reduce the risk of the TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of the largest unbounded chunk.<br>
275-
Experimental option, defaults to false.
276275
</td>
277276
</tr>
278277
</tbody>

0 commit comments

Comments
 (0)