Skip to content

[FLINK-37479][postgres] Add support for PARTITIONED TABLE #4004

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
12 changes: 12 additions & 0 deletions docs/content.zh/docs/connectors/flink-sources/postgres-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,18 @@ Connector Options
<li>false (default): All types of messages are sent as is.</li>
</td>
</tr>
<tr>
<td>scan.include-partitioned-tables.enabled</td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>
Whether to enable reading partitioned tables via partition root.<br>
If enabled:
(1) PUBLICATION must be created beforehand with parameter publish_via_partition_root=true
(2) Table list (regex or predefined list) should only match the parent table name, if table list matches both parent and child tables, snapshot data will be read twice.
</td>
</tr>
</tbody>
</table>
</div>
Expand Down
12 changes: 12 additions & 0 deletions docs/content/docs/connectors/flink-sources/postgres-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,18 @@ SELECT * FROM shipments;
<li>false(默认):所有类型的消息都保持原样下发。</li>
</td>
</tr>
<tr>
<td>scan.include-partitioned-tables.enabled</td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>
Whether to enable reading partitioned tables via partition root.<br>
If enabled:
(1) PUBLICATION must be created beforehand with parameter publish_via_partition_root=true
(2) Table list (regex or predefined list) should only match the parent table name, if table list matches both parent and child tables, snapshot data will be read twice.
</td>
</tr>
</tbody>
</table>
</div>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@ private void generateCreateTableEvent(PostgresSourceConfig sourceConfig) {
TableDiscoveryUtils.listTables(
sourceConfig.getDatabaseList().get(0),
jdbc,
sourceConfig.getTableFilters());
sourceConfig.getTableFilters(),
sourceConfig.includePartitionedTables());
for (TableId tableId : capturedTableIds) {
Schema schema = PostgresSchemaUtils.getTableSchema(tableId, sourceConfig, jdbc);
createTableEventCache.add(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,9 +176,14 @@ public ChunkSplitter createChunkSplitter(
@Override
public List<TableId> discoverDataCollections(JdbcSourceConfig sourceConfig) {
try (JdbcConnection jdbc = openJdbcConnection(sourceConfig)) {
boolean includePartitionedTables =
((PostgresSourceConfig) sourceConfig).includePartitionedTables();
return TableDiscoveryUtils.listTables(
// there is always a single database provided
sourceConfig.getDatabaseList().get(0), jdbc, sourceConfig.getTableFilters());
sourceConfig.getDatabaseList().get(0),
jdbc,
sourceConfig.getTableFilters(),
includePartitionedTables);
} catch (SQLException e) {
throw new FlinkRuntimeException("Error to discover tables: " + e.getMessage(), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,12 @@ public PostgresSourceBuilder<T> lsnCommitCheckpointsDelay(int lsnCommitDelay) {
return this;
}

/** Whether the connector should read partitioned tables via partition root or not. */
public PostgresSourceBuilder<T> includePartitionedTables(boolean includePartitionedTables) {
this.configFactory.setIncludePartitionedTables(includePartitionedTables);
return this;
}

/**
* Build the {@link PostgresIncrementalSource}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public class PostgresSourceConfig extends JdbcSourceConfig {

private final int subtaskId;
private final int lsnCommitCheckpointsDelay;
private final boolean includePartitionedTables;

public PostgresSourceConfig(
int subtaskId,
Expand Down Expand Up @@ -67,7 +68,8 @@ public PostgresSourceConfig(
boolean skipSnapshotBackfill,
boolean isScanNewlyAddedTableEnabled,
int lsnCommitCheckpointsDelay,
boolean assignUnboundedChunkFirst) {
boolean assignUnboundedChunkFirst,
boolean includePartitionedTables) {
super(
startupOptions,
databaseList,
Expand Down Expand Up @@ -97,6 +99,7 @@ public PostgresSourceConfig(
assignUnboundedChunkFirst);
this.subtaskId = subtaskId;
this.lsnCommitCheckpointsDelay = lsnCommitCheckpointsDelay;
this.includePartitionedTables = includePartitionedTables;
}

/**
Expand All @@ -117,6 +120,15 @@ public int getLsnCommitCheckpointsDelay() {
return this.lsnCommitCheckpointsDelay;
}

/**
* Returns {@code includePartitionedTables} value.
*
* @return include partitioned table
*/
public boolean includePartitionedTables() {
return includePartitionedTables;
}

/**
* Returns the slot name for backfill task.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ public class PostgresSourceConfigFactory extends JdbcSourceConfigFactory {

private int lsnCommitCheckpointsDelay;

private boolean includePartitionedTables;

/** Creates a new {@link PostgresSourceConfig} for the given subtask {@code subtaskId}. */
@Override
public PostgresSourceConfig create(int subtaskId) {
Expand Down Expand Up @@ -133,7 +135,8 @@ public PostgresSourceConfig create(int subtaskId) {
skipSnapshotBackfill,
scanNewlyAddedTableEnabled,
lsnCommitCheckpointsDelay,
assignUnboundedChunkFirst);
assignUnboundedChunkFirst,
includePartitionedTables);
}

/**
Expand Down Expand Up @@ -181,4 +184,9 @@ public void heartbeatInterval(Duration heartbeatInterval) {
public void setLsnCommitCheckpointsDelay(int lsnCommitCheckpointsDelay) {
this.lsnCommitCheckpointsDelay = lsnCommitCheckpointsDelay;
}

/** Enable include partitioned table. */
public void setIncludePartitionedTables(boolean includePartitionedTables) {
this.includePartitionedTables = includePartitionedTables;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,4 +87,14 @@ public class PostgresSourceOptions extends JdbcSourceOptions {
+ "By setting this to higher value, the offset that is consumed by global slot will be "
+ "committed after multiple checkpoint delays instead of after each checkpoint completion.\n"
+ "This allows continuous recycle of log files in stream phase.");

public static final ConfigOption<Boolean> SCAN_INCLUDE_PARTITIONED_TABLES_ENABLED =
ConfigOptions.key("scan.include-partitioned-tables.enabled")
.booleanType()
.defaultValue(Boolean.FALSE)
.withDescription(
"Enable reading from partitioned table via partition root.\n"
+ "If enabled:\n"
+ "(1) PUBLICATION must be created beforehand with parameter publish_via_partition_root=true\n"
+ "(2) Table list (regex or predefined list) should only match the parent table name, if table list matches both parent and child tables, snapshot data will be read twice.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,17 @@ public class TableDiscoveryUtils {
private static final Logger LOG = LoggerFactory.getLogger(TableDiscoveryUtils.class);

public static List<TableId> listTables(
String database, JdbcConnection jdbc, RelationalTableFilters tableFilters)
String database,
JdbcConnection jdbc,
RelationalTableFilters tableFilters,
boolean includePartitionedTables)
throws SQLException {

Set<TableId> allTableIds =
jdbc.readTableNames(database, null, null, new String[] {"TABLE"});
Copy link
Contributor

@loserwang1024 loserwang1024 Apr 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hope we can add a param such as partitioned(maybe we can check whether debezium has, so we can reuse), when this param is enabled:

  1. discovery partition table here.
  2. add publish_via_partition_root=true when create publication.We can add a initRootPublication like what io.debezium.connector.postgresql.connection.PostgresReplicationConnection#initPublication does or just modify this method. (But this class is copy from Debezium, too much difference is not recommand).

Though it's enough to read partition table now. But user have to create publication in advance.
To be honest, I hope Debeziumapply it. But if it lacks it, we can do it to make easier for user.

Just my own thought,
@phamvinh1712 @leonardBang , What do you think?

Copy link
Contributor Author

@phamvinh1712 phamvinh1712 Apr 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know that debezium doesn't create publication with publish_via_partition_root=true and users will need to create it themselves in advance.
I agree with you that if we do it, it will help users but i'm afraid it will make us hard to upgrade debezium version (which i believe we need to do at some points since flink-cdc is using an old version of debezium 1.9.6)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Theres is also another concern. If user use regex expression as table name, both parent table and subtable will be captured. Then snapshot data will be read twice.

Thus, a option can also tell user that. If they enable partition table, they should pay attention to it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Theres is also another concern. If user use regex expression as table name, both parent table and subtable will be captured. Then snapshot data will be read twice.

Thus, a option can also tell user that. If they enable partition table, they should pay attention to it.

do you have any suggestion for this? should we add a section on flink-cdc doc for this?

String[] tableTypes = new String[] {"TABLE"};
if (includePartitionedTables) {
tableTypes = new String[] {"TABLE", "PARTITIONED TABLE"};
}
Set<TableId> allTableIds = jdbc.readTableNames(database, null, null, tableTypes);

Set<TableId> capturedTables =
allTableIds.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.DECODING_PLUGIN_NAME;
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.HEARTBEAT_INTERVAL;
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.PG_PORT;
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_INCLUDE_PARTITIONED_TABLES_ENABLED;
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN;
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED;
Expand Down Expand Up @@ -117,6 +118,7 @@ public DynamicTableSource createDynamicTableSource(DynamicTableFactory.Context c
boolean skipSnapshotBackfill = config.get(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
boolean isScanNewlyAddedTableEnabled = config.get(SCAN_NEWLY_ADDED_TABLE_ENABLED);
int lsnCommitCheckpointsDelay = config.get(SCAN_LSN_COMMIT_CHECKPOINTS_DELAY);
boolean includePartitionedTables = config.get(SCAN_INCLUDE_PARTITIONED_TABLES_ENABLED);
boolean assignUnboundedChunkFirst =
config.get(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED);
boolean appendOnly = config.get(SCAN_READ_CHANGELOG_AS_APPEND_ONLY_ENABLED);
Expand Down Expand Up @@ -167,7 +169,8 @@ public DynamicTableSource createDynamicTableSource(DynamicTableFactory.Context c
isScanNewlyAddedTableEnabled,
lsnCommitCheckpointsDelay,
assignUnboundedChunkFirst,
appendOnly);
appendOnly,
includePartitionedTables);
}

@Override
Expand Down Expand Up @@ -212,6 +215,7 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(SCAN_LSN_COMMIT_CHECKPOINTS_DELAY);
options.add(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED);
options.add(SCAN_READ_CHANGELOG_AS_APPEND_ONLY_ENABLED);
options.add(SCAN_INCLUDE_PARTITIONED_TABLES_ENABLED);
return options;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ public class PostgreSQLTableSource implements ScanTableSource, SupportsReadingMe
private final int lsnCommitCheckpointsDelay;
private final boolean assignUnboundedChunkFirst;
private final boolean appendOnly;
private final boolean includePartitionedTables;

// --------------------------------------------------------------------------------------------
// Mutable attributes
Expand Down Expand Up @@ -128,7 +129,8 @@ public PostgreSQLTableSource(
boolean isScanNewlyAddedTableEnabled,
int lsnCommitCheckpointsDelay,
boolean assignUnboundedChunkFirst,
boolean appendOnly) {
boolean appendOnly,
boolean includePartitionedTables) {
this.physicalSchema = physicalSchema;
this.port = port;
this.hostname = checkNotNull(hostname);
Expand Down Expand Up @@ -162,6 +164,7 @@ public PostgreSQLTableSource(
this.lsnCommitCheckpointsDelay = lsnCommitCheckpointsDelay;
this.assignUnboundedChunkFirst = assignUnboundedChunkFirst;
this.appendOnly = appendOnly;
this.includePartitionedTables = includePartitionedTables;
}

@Override
Expand Down Expand Up @@ -230,6 +233,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
.scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled)
.lsnCommitCheckpointsDelay(lsnCommitCheckpointsDelay)
.assignUnboundedChunkFirst(assignUnboundedChunkFirst)
.includePartitionedTables(includePartitionedTables)
.build();
return SourceProvider.of(parallelSource);
} else {
Expand Down Expand Up @@ -300,7 +304,8 @@ public DynamicTableSource copy() {
scanNewlyAddedTableEnabled,
lsnCommitCheckpointsDelay,
assignUnboundedChunkFirst,
appendOnly);
appendOnly,
includePartitionedTables);
source.metadataKeys = metadataKeys;
source.producedDataType = producedDataType;
return source;
Expand Down Expand Up @@ -345,7 +350,8 @@ public boolean equals(Object o) {
&& Objects.equals(skipSnapshotBackfill, that.skipSnapshotBackfill)
&& Objects.equals(scanNewlyAddedTableEnabled, that.scanNewlyAddedTableEnabled)
&& Objects.equals(assignUnboundedChunkFirst, that.assignUnboundedChunkFirst)
&& Objects.equals(appendOnly, that.appendOnly);
&& Objects.equals(appendOnly, that.appendOnly)
&& Objects.equals(includePartitionedTables, that.includePartitionedTables);
}

@Override
Expand Down Expand Up @@ -381,7 +387,8 @@ public int hashCode() {
skipSnapshotBackfill,
scanNewlyAddedTableEnabled,
assignUnboundedChunkFirst,
appendOnly);
appendOnly,
includePartitionedTables);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,14 @@ class PostgresDialectTest extends PostgresTestBase {
POSTGRES_CONTAINER.getUsername(),
POSTGRES_CONTAINER.getPassword());

private final UniqueDatabase inventoryPartitionedDatabase =
new UniqueDatabase(
POSTGRES_CONTAINER,
"postgres3",
"inventory_partitioned",
POSTGRES_CONTAINER.getUsername(),
POSTGRES_CONTAINER.getPassword());

@Test
void testDiscoverDataCollectionsInMultiDatabases() {

Expand Down Expand Up @@ -88,4 +96,23 @@ void testDiscoverDataCollectionsInMultiDatabases() {
configFactoryOfInventoryDatabase2.create(0));
Assertions.assertThat(tableIdsOfInventoryDatabase2).isEmpty();
}

@Test
void testDiscoverDataCollectionsForPartitionedTable() {
// initial database with partitioned table
inventoryPartitionedDatabase.createAndInitialize();

// get table named 'inventory_partitioned.products' from inventoryPartitionedDatabase
PostgresSourceConfigFactory configFactoryOfInventoryPartitionedDatabase =
getMockPostgresSourceConfigFactory(
inventoryPartitionedDatabase, "inventory_partitioned", "products", 10);
configFactoryOfInventoryPartitionedDatabase.setIncludePartitionedTables(true);
PostgresDialect dialectOfInventoryPartitionedDatabase =
new PostgresDialect(configFactoryOfInventoryPartitionedDatabase.create(0));
List<TableId> tableIdsOfInventoryPartitionedDatabase =
dialectOfInventoryPartitionedDatabase.discoverDataCollections(
configFactoryOfInventoryPartitionedDatabase.create(0));
Assertions.assertThat(tableIdsOfInventoryPartitionedDatabase.get(0))
.hasToString("inventory_partitioned.products");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ public MockPostgreSQLTableSource(PostgreSQLTableSource postgreSQLTableSource) {
(boolean) get(postgreSQLTableSource, "scanNewlyAddedTableEnabled"),
(int) get(postgreSQLTableSource, "lsnCommitCheckpointsDelay"),
(boolean) get(postgreSQLTableSource, "assignUnboundedChunkFirst"),
(boolean) get(postgreSQLTableSource, "appendOnly"));
(boolean) get(postgreSQLTableSource, "appendOnly"),
(boolean) get(postgreSQLTableSource, "includePartitionedTables"));
}

@Override
Expand Down
Loading
Loading