From 1cb2c02688120e4533126cc783741f15dd582260 Mon Sep 17 00:00:00 2001 From: ouyangwulin Date: Thu, 21 Aug 2025 16:11:53 +0800 Subject: [PATCH 1/3] [FLINK-38157][postgres-pipeline] supported chunk first options --- .../docs/connectors/pipeline-connectors/postgres.md | 13 ++++++++++++- .../docs/connectors/pipeline-connectors/postgres.md | 11 +++++++++++ .../postgres/factory/PostgresDataSourceFactory.java | 5 +++++ .../postgres/source/PostgresDataSourceOptions.java | 9 +++++++++ 4 files changed, 37 insertions(+), 1 deletion(-) diff --git a/docs/content.zh/docs/connectors/pipeline-connectors/postgres.md b/docs/content.zh/docs/connectors/pipeline-connectors/postgres.md index ca76905ae37..d783efe7833 100644 --- a/docs/content.zh/docs/connectors/pipeline-connectors/postgres.md +++ b/docs/content.zh/docs/connectors/pipeline-connectors/postgres.md @@ -245,7 +245,7 @@ pipeline: 分块元数据的组大小,如果元数据大小超过该组大小,则元数据将被划分为多个组。 - + metadata.list optional false @@ -254,6 +254,17 @@ pipeline: 源记录中可读取的元数据列表,将传递给下游并在转换模块中使用,各字段以逗号分隔。可用的可读元数据包括:op_ts。 + + scan.incremental.snapshot.unbounded-chunk-first.enabled + optional + false + String + + 在快照读取阶段,是否优先分配无界分块。
+ 这有助于降低在对最大无界分块进行快照时,TaskManager 发生内存溢出(OOM)错误的风险。
+ 此为实验性选项,默认值为 false。 + + diff --git a/docs/content/docs/connectors/pipeline-connectors/postgres.md b/docs/content/docs/connectors/pipeline-connectors/postgres.md index e52fa34bbe5..120d9593564 100644 --- a/docs/content/docs/connectors/pipeline-connectors/postgres.md +++ b/docs/content/docs/connectors/pipeline-connectors/postgres.md @@ -246,6 +246,17 @@ pipeline: List of readable metadata from SourceRecord to be passed to downstream and could be used in transform module, split by `,`. Available readable metadata are: op_ts. + + scan.incremental.snapshot.unbounded-chunk-first.enabled + optional + false + String + + Whether to assign the unbounded chunks first during snapshot reading phase.
+ This might help reduce the risk of the TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of the largest unbounded chunk.
+ Experimental option, defaults to false. + + diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactory.java index 61c9766c7cb..918d479f6df 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactory.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactory.java @@ -70,6 +70,7 @@ import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP; import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN; import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE; +import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED; import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_LSN_COMMIT_CHECKPOINTS_DELAY; import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE; import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_STARTUP_MODE; @@ -120,6 +121,8 @@ public DataSource createDataSource(Context context) { double distributionFactorLower = config.get(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND); boolean closeIdleReaders = config.get(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED); + boolean isAssignUnboundedChunkFirst = + config.get(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED); Duration connectTimeout = config.get(CONNECT_TIMEOUT); int connectMaxRetries = config.get(CONNECT_MAX_RETRIES); @@ -165,6 +168,7 @@ public DataSource createDataSource(Context context) { .closeIdleReaders(closeIdleReaders) .skipSnapshotBackfill(skipSnapshotBackfill) .lsnCommitCheckpointsDelay(lsnCommitCheckpointsDelay) + .assignUnboundedChunkFirst(isAssignUnboundedChunkFirst) .getConfigFactory(); List tableIds = PostgresSchemaUtils.listTables(configFactory.create(0), null); @@ -252,6 +256,7 @@ public Set> optionalOptions() { options.add(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND); options.add(SCAN_LSN_COMMIT_CHECKPOINTS_DELAY); options.add(METADATA_LIST); + options.add(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED); return options; } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSourceOptions.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSourceOptions.java index 6d86bea3047..ec51de8b680 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSourceOptions.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSourceOptions.java @@ -255,4 +255,13 @@ public class PostgresDataSourceOptions { .withDescription( "List of readable metadata from SourceRecord to be passed to downstream, split by `,`. " + "Available readable metadata are: op_ts."); + + @Experimental + public static final ConfigOption + SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED = + ConfigOptions.key("scan.incremental.snapshot.unbounded-chunk-first.enabled") + .booleanType() + .defaultValue(false) + .withDescription( + "Whether to assign the unbounded chunks first during snapshot reading phase. This might help reduce the risk of the TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of the largest unbounded chunk. Defaults to false."); } From a990e0274992af57a99d2654462dd11a56243b9d Mon Sep 17 00:00:00 2001 From: ouyangwulin Date: Thu, 21 Aug 2025 18:51:50 +0800 Subject: [PATCH 2/3] add test for chunk first enable --- .../postgres/source/PostgresPipelineITCaseTest.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCaseTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCaseTest.java index 5f3c74ae793..779e36df4f0 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCaseTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCaseTest.java @@ -47,6 +47,8 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -132,8 +134,9 @@ public void testInitialStartupMode() throws Exception { .containsExactlyInAnyOrder(expectedSnapshot.toArray(new Event[0])); } - @Test - public void testInitialStartupModeWithOpts() throws Exception { + @ParameterizedTest(name = "chunkFirstEnable: {0}") + @ValueSource(booleans = {true, false}) + public void testInitialStartupModeWithOpts(boolean chunkFirstEnable) throws Exception { inventoryDatabase.createAndInitialize(); Configuration sourceConfiguration = new Configuration(); sourceConfiguration.set(PostgresDataSourceOptions.HOSTNAME, POSTGRES_CONTAINER.getHost()); @@ -151,6 +154,9 @@ public void testInitialStartupModeWithOpts() throws Exception { inventoryDatabase.getDatabaseName() + ".inventory.products"); sourceConfiguration.set(PostgresDataSourceOptions.SERVER_TIME_ZONE, "UTC"); sourceConfiguration.set(PostgresDataSourceOptions.METADATA_LIST, "op_ts"); + sourceConfiguration.set( + PostgresDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED, + chunkFirstEnable); Factory.Context context = new FactoryHelper.DefaultContext( From 89ff7544bd0cbb633771ebfc693d08d5e8441f2f Mon Sep 17 00:00:00 2001 From: ouyangwulin Date: Fri, 22 Aug 2025 14:31:36 +0800 Subject: [PATCH 3/3] edit varible name. --- .../postgres/source/PostgresPipelineITCaseTest.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCaseTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCaseTest.java index 779e36df4f0..a57e1fed54f 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCaseTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCaseTest.java @@ -134,9 +134,9 @@ public void testInitialStartupMode() throws Exception { .containsExactlyInAnyOrder(expectedSnapshot.toArray(new Event[0])); } - @ParameterizedTest(name = "chunkFirstEnable: {0}") + @ParameterizedTest(name = "unboundedChunkFirst: {0}") @ValueSource(booleans = {true, false}) - public void testInitialStartupModeWithOpts(boolean chunkFirstEnable) throws Exception { + public void testInitialStartupModeWithOpts(boolean unboundedChunkFirst) throws Exception { inventoryDatabase.createAndInitialize(); Configuration sourceConfiguration = new Configuration(); sourceConfiguration.set(PostgresDataSourceOptions.HOSTNAME, POSTGRES_CONTAINER.getHost()); @@ -156,7 +156,7 @@ public void testInitialStartupModeWithOpts(boolean chunkFirstEnable) throws Exce sourceConfiguration.set(PostgresDataSourceOptions.METADATA_LIST, "op_ts"); sourceConfiguration.set( PostgresDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED, - chunkFirstEnable); + unboundedChunkFirst); Factory.Context context = new FactoryHelper.DefaultContext(