Skip to content

Commit 49fe26d

Browse files
[FLINK-37738][cdc-connector][postgres] Support read changelog as append only mode (#4005)
1 parent 1c0c5af commit 49fe26d

File tree

11 files changed

+276
-259
lines changed

11 files changed

+276
-259
lines changed

docs/content.zh/docs/connectors/flink-sources/postgres-cdc.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,19 @@ Connector Options
268268
For example updating an already updated value in snapshot, or deleting an already deleted entry in snapshot. These replayed change log events should be handled specially.
269269
</td>
270270
</tr>
271+
<tr>
272+
<td>scan.read-changelog-as-append-only.enabled</td>
273+
<td>optional</td>
274+
<td style="word-wrap: break-word;">false</td>
275+
<td>Boolean</td>
276+
<td>
277+
Whether to convert the changelog stream to an append-only stream.<br>
278+
This feature is only used in special scenarios where you need to save upstream table deletion messages. For example, in a logical deletion scenario, users are not allowed to physically delete downstream messages. In this case, this feature is used in conjunction with the row_kind metadata field. Therefore, the downstream can save all detailed data at first, and then use the row_kind field to determine whether to perform logical deletion.<br>
279+
The option values are as follows:<br>
280+
<li>true: All types of messages (including INSERT, DELETE, UPDATE_BEFORE, and UPDATE_AFTER) will be converted into INSERT messages.</li>
281+
<li>false (default): All types of messages are sent as is.</li>
282+
</td>
283+
</tr>
271284
</tbody>
272285
</table>
273286
</div>

docs/content/docs/connectors/flink-sources/postgres-cdc.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,19 @@ SELECT * FROM shipments;
265265
For example updating an already updated value in snapshot, or deleting an already deleted entry in snapshot. These replayed change log events should be handled specially.
266266
</td>
267267
</tr>
268+
<tr>
269+
<td>scan.read-changelog-as-append-only.enabled</td>
270+
<td>optional</td>
271+
<td style="word-wrap: break-word;">false</td>
272+
<td>Boolean</td>
273+
<td>
274+
是否将 changelog 数据流转换为 append-only 数据流。<br>
275+
仅在需要保存上游表删除消息等特殊场景下开启使用,比如在逻辑删除场景下,用户不允许物理删除下游消息,此时使用该特性,并配合 row_kind 元数据字段,下游可以先保存所有明细数据,再通过 row_kind 字段判断是否进行逻辑删除。<br>
276+
参数取值如下:<br>
277+
<li>true:所有类型的消息(包括INSERT、DELETE、UPDATE_BEFORE、UPDATE_AFTER)都会转换成 INSERT 类型的消息。</li>
278+
<li>false(默认):所有类型的消息都保持原样下发。</li>
279+
</td>
280+
</tr>
268281
</tbody>
269282
</table>
270283
</div>

flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/options/SourceOptions.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,4 +146,12 @@ public class SourceOptions {
146146
.defaultValue(false)
147147
.withDescription(
148148
"Whether to assign the unbounded chunks first during snapshot reading phase. This might help reduce the risk of the TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of the largest unbounded chunk. Defaults to false.");
149+
150+
@Experimental
151+
public static final ConfigOption<Boolean> SCAN_READ_CHANGELOG_AS_APPEND_ONLY_ENABLED =
152+
ConfigOptions.key("scan.read-changelog-as-append-only.enabled")
153+
.booleanType()
154+
.defaultValue(false)
155+
.withDescription(
156+
"Whether to convert the changelog data stream to an append-only data stream");
149157
}

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/table/RowDataDebeziumDeserializeSchema.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,8 @@ public interface ValueValidator extends Serializable {
8181
/** Whether the deserializer needs to handle metadata columns. */
8282
private final boolean hasMetadata;
8383

84+
private final boolean appendOnly;
85+
8486
/**
8587
* A wrapped output collector which is used to append metadata columns after physical columns.
8688
*/
@@ -116,6 +118,7 @@ public static Builder newBuilder() {
116118
this.resultTypeInfo = checkNotNull(resultTypeInfo);
117119
this.validator = checkNotNull(validator);
118120
this.changelogMode = checkNotNull(changelogMode);
121+
this.appendOnly = appendOnly;
119122
}
120123

121124
@Override
@@ -161,7 +164,7 @@ private GenericRowData extractBeforeRow(Struct value, Schema valueSchema) throws
161164
}
162165

163166
private void emit(SourceRecord inRecord, RowData physicalRow, Collector<RowData> collector) {
164-
if (!hasMetadata) {
167+
if (!hasMetadata && !appendOnly) {
165168
collector.collect(physicalRow);
166169
return;
167170
}

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactory.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import org.apache.flink.cdc.debezium.table.DebeziumOptions;
2727
import org.apache.flink.cdc.debezium.utils.JdbcUrlUtils;
2828
import org.apache.flink.configuration.ConfigOption;
29-
import org.apache.flink.configuration.Configuration;
3029
import org.apache.flink.configuration.ReadableConfig;
3130
import org.apache.flink.table.api.ValidationException;
3231
import org.apache.flink.table.api.config.TableConfigOptions;
@@ -125,7 +124,7 @@ public DynamicTableSource createDynamicTableSource(Context context) {
125124
MySqlSourceOptions.CONNECT_TIMEOUT, connectTimeout, Duration.ofMillis(250));
126125
}
127126

128-
OptionUtils.printOptions(IDENTIFIER, ((Configuration) config).toMap());
127+
OptionUtils.printOptions(IDENTIFIER, config.toMap());
129128

130129
return new MySqlTableSource(
131130
physicalSchema,

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableFactory.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.apache.flink.cdc.connectors.postgres.utils.OptionUtils;
2424
import org.apache.flink.cdc.debezium.table.DebeziumChangelogMode;
2525
import org.apache.flink.configuration.ConfigOption;
26-
import org.apache.flink.configuration.Configuration;
2726
import org.apache.flink.configuration.ReadableConfig;
2827
import org.apache.flink.table.api.ValidationException;
2928
import org.apache.flink.table.catalog.ResolvedSchema;
@@ -46,6 +45,7 @@
4645
import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP;
4746
import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED;
4847
import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED;
48+
import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_READ_CHANGELOG_AS_APPEND_ONLY_ENABLED;
4949
import static org.apache.flink.cdc.connectors.base.utils.ObjectUtils.doubleCompare;
5050
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.CHANGELOG_MODE;
5151
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.CHUNK_META_GROUP_SIZE;
@@ -119,6 +119,7 @@ public DynamicTableSource createDynamicTableSource(DynamicTableFactory.Context c
119119
int lsnCommitCheckpointsDelay = config.get(SCAN_LSN_COMMIT_CHECKPOINTS_DELAY);
120120
boolean assignUnboundedChunkFirst =
121121
config.get(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED);
122+
boolean appendOnly = config.get(SCAN_READ_CHANGELOG_AS_APPEND_ONLY_ENABLED);
122123

123124
if (enableParallelRead) {
124125
validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1);
@@ -134,7 +135,7 @@ public DynamicTableSource createDynamicTableSource(DynamicTableFactory.Context c
134135
"The Postgres CDC connector does not support 'latest-offset' startup mode when 'scan.incremental.snapshot.enabled' is disabled, you can enable 'scan.incremental.snapshot.enabled' to use this startup mode.");
135136
}
136137

137-
OptionUtils.printOptions(IDENTIFIER, ((Configuration) config).toMap());
138+
OptionUtils.printOptions(IDENTIFIER, config.toMap());
138139

139140
return new PostgreSQLTableSource(
140141
physicalSchema,
@@ -165,7 +166,8 @@ public DynamicTableSource createDynamicTableSource(DynamicTableFactory.Context c
165166
skipSnapshotBackfill,
166167
isScanNewlyAddedTableEnabled,
167168
lsnCommitCheckpointsDelay,
168-
assignUnboundedChunkFirst);
169+
assignUnboundedChunkFirst,
170+
appendOnly);
169171
}
170172

171173
@Override
@@ -209,6 +211,7 @@ public Set<ConfigOption<?>> optionalOptions() {
209211
options.add(SCAN_NEWLY_ADDED_TABLE_ENABLED);
210212
options.add(SCAN_LSN_COMMIT_CHECKPOINTS_DELAY);
211213
options.add(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED);
214+
options.add(SCAN_READ_CHANGELOG_AS_APPEND_ONLY_ENABLED);
212215
return options;
213216
}
214217

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableSource.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ public class PostgreSQLTableSource implements ScanTableSource, SupportsReadingMe
8686
private final boolean scanNewlyAddedTableEnabled;
8787
private final int lsnCommitCheckpointsDelay;
8888
private final boolean assignUnboundedChunkFirst;
89+
private final boolean appendOnly;
8990

9091
// --------------------------------------------------------------------------------------------
9192
// Mutable attributes
@@ -126,7 +127,8 @@ public PostgreSQLTableSource(
126127
boolean skipSnapshotBackfill,
127128
boolean isScanNewlyAddedTableEnabled,
128129
int lsnCommitCheckpointsDelay,
129-
boolean assignUnboundedChunkFirst) {
130+
boolean assignUnboundedChunkFirst,
131+
boolean appendOnly) {
130132
this.physicalSchema = physicalSchema;
131133
this.port = port;
132134
this.hostname = checkNotNull(hostname);
@@ -159,10 +161,15 @@ public PostgreSQLTableSource(
159161
this.scanNewlyAddedTableEnabled = isScanNewlyAddedTableEnabled;
160162
this.lsnCommitCheckpointsDelay = lsnCommitCheckpointsDelay;
161163
this.assignUnboundedChunkFirst = assignUnboundedChunkFirst;
164+
this.appendOnly = appendOnly;
162165
}
163166

164167
@Override
165168
public ChangelogMode getChangelogMode() {
169+
if (appendOnly) {
170+
return ChangelogMode.insertOnly();
171+
}
172+
166173
switch (changelogMode) {
167174
case UPSERT:
168175
return org.apache.flink.table.connector.ChangelogMode.upsert();
@@ -190,6 +197,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
190197
PostgreSQLDeserializationConverterFactory.instance())
191198
.setValueValidator(new PostgresValueValidator(schemaName, tableName))
192199
.setChangelogMode(changelogMode)
200+
.setAppendOnly(appendOnly)
193201
.build();
194202

195203
if (enableParallelRead) {
@@ -291,7 +299,8 @@ public DynamicTableSource copy() {
291299
skipSnapshotBackfill,
292300
scanNewlyAddedTableEnabled,
293301
lsnCommitCheckpointsDelay,
294-
assignUnboundedChunkFirst);
302+
assignUnboundedChunkFirst,
303+
appendOnly);
295304
source.metadataKeys = metadataKeys;
296305
source.producedDataType = producedDataType;
297306
return source;
@@ -335,7 +344,8 @@ public boolean equals(Object o) {
335344
&& Objects.equals(closeIdleReaders, that.closeIdleReaders)
336345
&& Objects.equals(skipSnapshotBackfill, that.skipSnapshotBackfill)
337346
&& Objects.equals(scanNewlyAddedTableEnabled, that.scanNewlyAddedTableEnabled)
338-
&& Objects.equals(assignUnboundedChunkFirst, that.assignUnboundedChunkFirst);
347+
&& Objects.equals(assignUnboundedChunkFirst, that.assignUnboundedChunkFirst)
348+
&& Objects.equals(appendOnly, that.appendOnly);
339349
}
340350

341351
@Override
@@ -370,7 +380,8 @@ public int hashCode() {
370380
closeIdleReaders,
371381
skipSnapshotBackfill,
372382
scanNewlyAddedTableEnabled,
373-
assignUnboundedChunkFirst);
383+
assignUnboundedChunkFirst,
384+
appendOnly);
374385
}
375386

376387
@Override

0 commit comments

Comments
 (0)