Skip to content

[FLINK-38157][postgres-pipeline] supported chunk first options #4103

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Aug 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion docs/content.zh/docs/connectors/pipeline-connectors/postgres.md
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ pipeline:
分块元数据的组大小,如果元数据大小超过该组大小,则元数据将被划分为多个组。
</td>
</tr>
<tr>
<tr>
<td>metadata.list</td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
Expand All @@ -254,6 +254,17 @@ pipeline:
源记录中可读取的元数据列表,将传递给下游并在转换模块中使用,各字段以逗号分隔。可用的可读元数据包括:op_ts。
</td>
</tr>
<tr>
<td>scan.incremental.snapshot.unbounded-chunk-first.enabled</td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>String</td>
<td>
在快照读取阶段,是否优先分配无界分块。<br>
这有助于降低在对最大无界分块进行快照时,TaskManager 发生内存溢出(OOM)错误的风险。<br>
此为实验性选项,默认值为 false。
</td>
</tr>
</tbody>
</table>
</div>
Expand Down
11 changes: 11 additions & 0 deletions docs/content/docs/connectors/pipeline-connectors/postgres.md
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,17 @@ pipeline:
List of readable metadata from SourceRecord to be passed to downstream and could be used in transform module, split by `,`. Available readable metadata are: op_ts.
</td>
</tr>
<tr>
<td>scan.incremental.snapshot.unbounded-chunk-first.enabled</td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>String</td>
<td>
Whether to assign the unbounded chunks first during snapshot reading phase.<br>
This might help reduce the risk of the TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of the largest unbounded chunk.<br>
Experimental option, defaults to false.
</td>
</tr>
</tbody>
</table>
</div>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP;
import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN;
import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED;
import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_LSN_COMMIT_CHECKPOINTS_DELAY;
import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_STARTUP_MODE;
Expand Down Expand Up @@ -120,6 +121,8 @@ public DataSource createDataSource(Context context) {
double distributionFactorLower = config.get(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);

boolean closeIdleReaders = config.get(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
boolean isAssignUnboundedChunkFirst =
config.get(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED);

Duration connectTimeout = config.get(CONNECT_TIMEOUT);
int connectMaxRetries = config.get(CONNECT_MAX_RETRIES);
Expand Down Expand Up @@ -165,6 +168,7 @@ public DataSource createDataSource(Context context) {
.closeIdleReaders(closeIdleReaders)
.skipSnapshotBackfill(skipSnapshotBackfill)
.lsnCommitCheckpointsDelay(lsnCommitCheckpointsDelay)
.assignUnboundedChunkFirst(isAssignUnboundedChunkFirst)
.getConfigFactory();

List<TableId> tableIds = PostgresSchemaUtils.listTables(configFactory.create(0), null);
Expand Down Expand Up @@ -252,6 +256,7 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);
options.add(SCAN_LSN_COMMIT_CHECKPOINTS_DELAY);
options.add(METADATA_LIST);
options.add(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED);
return options;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,4 +255,13 @@ public class PostgresDataSourceOptions {
.withDescription(
"List of readable metadata from SourceRecord to be passed to downstream, split by `,`. "
+ "Available readable metadata are: op_ts.");

@Experimental
public static final ConfigOption<Boolean>
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED =
ConfigOptions.key("scan.incremental.snapshot.unbounded-chunk-first.enabled")
.booleanType()
.defaultValue(false)
.withDescription(
"Whether to assign the unbounded chunks first during snapshot reading phase. This might help reduce the risk of the TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of the largest unbounded chunk. Defaults to false.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -132,8 +134,9 @@ public void testInitialStartupMode() throws Exception {
.containsExactlyInAnyOrder(expectedSnapshot.toArray(new Event[0]));
}

@Test
public void testInitialStartupModeWithOpts() throws Exception {
@ParameterizedTest(name = "unboundedChunkFirst: {0}")
@ValueSource(booleans = {true, false})
public void testInitialStartupModeWithOpts(boolean unboundedChunkFirst) throws Exception {
inventoryDatabase.createAndInitialize();
Configuration sourceConfiguration = new Configuration();
sourceConfiguration.set(PostgresDataSourceOptions.HOSTNAME, POSTGRES_CONTAINER.getHost());
Expand All @@ -151,6 +154,9 @@ public void testInitialStartupModeWithOpts() throws Exception {
inventoryDatabase.getDatabaseName() + ".inventory.products");
sourceConfiguration.set(PostgresDataSourceOptions.SERVER_TIME_ZONE, "UTC");
sourceConfiguration.set(PostgresDataSourceOptions.METADATA_LIST, "op_ts");
sourceConfiguration.set(
PostgresDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED,
unboundedChunkFirst);

Factory.Context context =
new FactoryHelper.DefaultContext(
Expand Down
Loading