From cdfd95ca339cc4495a92265ddeb9bbcb54e093e9 Mon Sep 17 00:00:00 2001 From: Vinh Pham Date: Sat, 26 Apr 2025 22:53:58 +0100 Subject: [PATCH 1/6] [FLINK-37479][postgres] Add support for PARTITIONED TABLE --- .../source/utils/TableDiscoveryUtils.java | 3 +- .../postgres/source/PostgresDialectTest.java | 26 ++++ .../table/PostgreSQLConnectorITCase.java | 112 ++++++++++++++++++ .../resources/ddl/inventory_partitioned.sql | 48 ++++++++ 4 files changed, 188 insertions(+), 1 deletion(-) create mode 100644 flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/resources/ddl/inventory_partitioned.sql diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/TableDiscoveryUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/TableDiscoveryUtils.java index 181c2cd0d01..81c710b1cab 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/TableDiscoveryUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/TableDiscoveryUtils.java @@ -38,7 +38,8 @@ public static List listTables( throws SQLException { Set allTableIds = - jdbc.readTableNames(database, null, null, new String[] {"TABLE"}); + jdbc.readTableNames( + database, null, null, new String[] {"TABLE", "PARTITIONED TABLE"}); Set capturedTables = allTableIds.stream() diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialectTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialectTest.java index 898cec1fb08..32197c116db 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialectTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialectTest.java @@ -46,6 +46,14 @@ class PostgresDialectTest extends PostgresTestBase { POSTGRES_CONTAINER.getUsername(), POSTGRES_CONTAINER.getPassword()); + private final UniqueDatabase inventoryPartitionedDatabase = + new UniqueDatabase( + POSTGRES_CONTAINER, + "postgres3", + "inventory_partitioned", + POSTGRES_CONTAINER.getUsername(), + POSTGRES_CONTAINER.getPassword()); + @Test void testDiscoverDataCollectionsInMultiDatabases() { @@ -88,4 +96,22 @@ void testDiscoverDataCollectionsInMultiDatabases() { configFactoryOfInventoryDatabase2.create(0)); Assertions.assertThat(tableIdsOfInventoryDatabase2).isEmpty(); } + + @Test + void testDiscoverDataCollectionsForPartitionedTable() { + // initial database with partitioned table + inventoryPartitionedDatabase.createAndInitialize(); + + // get table named 'inventory_partitioned.products' from inventoryPartitionedDatabase + PostgresSourceConfigFactory configFactoryOfInventoryPartitionedDatabase = + getMockPostgresSourceConfigFactory( + inventoryPartitionedDatabase, "inventory_partitioned", "products", 10); + PostgresDialect dialectOfInventoryPartitionedDatabase = + new PostgresDialect(configFactoryOfInventoryPartitionedDatabase.create(0)); + List tableIdsOfInventoryPartitionedDatabase = + dialectOfInventoryPartitionedDatabase.discoverDataCollections( + configFactoryOfInventoryPartitionedDatabase.create(0)); + Assertions.assertThat(tableIdsOfInventoryPartitionedDatabase.get(0)) + .hasToString("inventory_partitioned.products"); + } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java index ade1b215fb5..6722a7977e0 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java @@ -232,6 +232,118 @@ void testConsumingAllEvents(boolean parallelismSnapshot) result.getJobClient().get().cancel().get(); } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testConsumingAllEventsForPartitionedTable(boolean parallelismSnapshot) + throws SQLException, ExecutionException, InterruptedException { + setup(parallelismSnapshot); + initializePostgresTable(POSTGRES_CONTAINER, "inventory_partitioned"); + String publicationName = "dbz_publication_" + new Random().nextInt(1000); + String slotName = getSlotName(); + try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER); + Statement statement = connection.createStatement()) { + statement.execute( + String.format( + "CREATE PUBLICATION %s FOR TABLE inventory_partitioned.products " + + " WITH (publish_via_partition_root=true)", + publicationName)); + statement.execute( + String.format( + "select pg_create_logical_replication_slot('%s','pgoutput');", + slotName)); + } + + String sourceDDL = + String.format( + "CREATE TABLE debezium_source (" + + " id INT NOT NULL," + + " name STRING," + + " description STRING," + + " weight DECIMAL(10,3)," + + " country STRING" + + ") WITH (" + + " 'connector' = 'postgres-cdc'," + + " 'hostname' = '%s'," + + " 'port' = '%s'," + + " 'username' = '%s'," + + " 'password' = '%s'," + + " 'database-name' = '%s'," + + " 'schema-name' = '%s'," + + " 'table-name' = '%s'," + + " 'scan.incremental.snapshot.enabled' = '%s'," + + " 'decoding.plugin.name' = 'pgoutput', " + + " 'debezium.publication.name' = '%s'," + + " 'slot.name' = '%s'" + + ")", + POSTGRES_CONTAINER.getHost(), + POSTGRES_CONTAINER.getMappedPort(POSTGRESQL_PORT), + POSTGRES_CONTAINER.getUsername(), + POSTGRES_CONTAINER.getPassword(), + POSTGRES_CONTAINER.getDatabaseName(), + "inventory_partitioned", + "products", + parallelismSnapshot, + publicationName, + slotName); + String sinkDDL = + "CREATE TABLE sink (" + + " id INT NOT NULL," + + " name STRING," + + " description STRING," + + " weight DECIMAL(10,3)," + + " country STRING," + + " PRIMARY KEY (id, country) NOT ENFORCED" + + ") WITH (" + + " 'connector' = 'values'," + + " 'sink-insert-only' = 'false'," + + " 'sink-expected-messages-num' = '20'" + + ")"; + tEnv.executeSql(sourceDDL); + tEnv.executeSql(sinkDDL); + + // async submit job + TableResult result = + tEnv.executeSql( + "INSERT INTO sink SELECT id, name, description, weight, country FROM debezium_source"); + + waitForSnapshotStarted("sink"); + + // wait a bit to make sure the replication slot is ready + Thread.sleep(5000); + + // generate WAL + try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER); + Statement statement = connection.createStatement()) { + statement.execute( + "INSERT INTO inventory_partitioned.products VALUES (default,'jacket','water resistent white wind breaker',0.2, 'us');"); // 110 + statement.execute( + "INSERT INTO inventory_partitioned.products VALUES (default,'scooter','Big 2-wheel scooter ',5.18, 'uk');"); + } + + waitForSinkSize("sink", 11); + + // consume both snapshot and wal events + String[] expected = + new String[] { + "101,scooter,Small 2-wheel scooter,3.140,us", + "102,car battery,12V car battery,8.100,us", + "103,12-pack drill bits,12-pack of drill bits with sizes ranging from #40 to #3,0.800,us", + "104,hammer,12oz carpenter's hammer,0.750,us", + "105,hammer,14oz carpenter's hammer,0.875,us", + "106,hammer,16oz carpenter's hammer,1.000,uk", + "107,rocks,box of assorted rocks,5.300,uk", + "108,jacket,water resistent black wind breaker,0.100,uk", + "109,spare tire,24 inch spare tire,22.200,uk", + "110,jacket,water resistent white wind breaker,0.200,us", + "111,scooter,Big 2-wheel scooter ,5.180,uk" + }; + + List actual = TestValuesTableFactory.getResultsAsStrings("sink"); + Assertions.assertThat(actual).containsExactlyInAnyOrder(expected); + + result.getJobClient().get().cancel().get(); + } + @ParameterizedTest @ValueSource(booleans = {true}) void testStartupFromLatestOffset(boolean parallelismSnapshot) throws Exception { diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/resources/ddl/inventory_partitioned.sql b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/resources/ddl/inventory_partitioned.sql new file mode 100644 index 00000000000..1cffc45c23d --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/resources/ddl/inventory_partitioned.sql @@ -0,0 +1,48 @@ +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. + +-- Create the schema that we'll use to populate data and watch the effect in the WAL +DROP SCHEMA IF EXISTS inventory_partitioned CASCADE; +CREATE SCHEMA inventory_partitioned; +SET search_path TO inventory_partitioned; + +-- Create and populate our products using a single insert with many rows +CREATE TABLE products ( + id SERIAL NOT NULL, + name VARCHAR(255) NOT NULL DEFAULT 'flink', + description VARCHAR(512), + weight FLOAT, + country VARCHAR(20) NOT NULL, + PRIMARY KEY (id, country) +) PARTITION BY LIST(country); +ALTER SEQUENCE products_id_seq RESTART WITH 101; +ALTER TABLE products REPLICA IDENTITY FULL; + +CREATE TABLE products_uk PARTITION OF products + FOR VALUES IN ('uk'); + +CREATE TABLE products_us PARTITION OF products + FOR VALUES IN ('us'); + +INSERT INTO products +VALUES (default,'scooter','Small 2-wheel scooter',3.14, 'us'), + (default,'car battery','12V car battery',8.1, 'us'), + (default,'12-pack drill bits','12-pack of drill bits with sizes ranging from #40 to #3',0.8, 'us'), + (default,'hammer','12oz carpenter''s hammer',0.75, 'us'), + (default,'hammer','14oz carpenter''s hammer',0.875, 'us'), + (default,'hammer','16oz carpenter''s hammer',1.0, 'uk'), + (default,'rocks','box of assorted rocks',5.3, 'uk'), + (default,'jacket','water resistent black wind breaker',0.1, 'uk'), + (default,'spare tire','24 inch spare tire',22.2, 'uk'); \ No newline at end of file From 6d6a18bb8e28af5d2fc6ad6a246db38c2987ece3 Mon Sep 17 00:00:00 2001 From: Vinh Pham Date: Sun, 4 May 2025 20:16:14 +0100 Subject: [PATCH 2/6] [FLINK-37479][postgres] Add test to capture change from new partition --- .../postgres/table/PostgreSQLConnectorITCase.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java index 6722a7977e0..f9025cc2384 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java @@ -318,6 +318,10 @@ void testConsumingAllEventsForPartitionedTable(boolean parallelismSnapshot) "INSERT INTO inventory_partitioned.products VALUES (default,'jacket','water resistent white wind breaker',0.2, 'us');"); // 110 statement.execute( "INSERT INTO inventory_partitioned.products VALUES (default,'scooter','Big 2-wheel scooter ',5.18, 'uk');"); + statement.execute( + "CREATE TABLE inventory_partitioned.products_china PARTITION OF inventory_partitioned.products FOR VALUES IN ('china');"); + statement.execute( + "INSERT INTO inventory_partitioned.products VALUES (default,'bike','Big 2-wheel bycicle ',6.18, 'china');"); } waitForSinkSize("sink", 11); @@ -335,7 +339,8 @@ void testConsumingAllEventsForPartitionedTable(boolean parallelismSnapshot) "108,jacket,water resistent black wind breaker,0.100,uk", "109,spare tire,24 inch spare tire,22.200,uk", "110,jacket,water resistent white wind breaker,0.200,us", - "111,scooter,Big 2-wheel scooter ,5.180,uk" + "111,scooter,Big 2-wheel scooter ,5.180,uk", + "112,bike,Big 2-wheel bycicle ,6.180,china" }; List actual = TestValuesTableFactory.getResultsAsStrings("sink"); From b542cf9bceefb37a3a70101dd6422d26caee6d84 Mon Sep 17 00:00:00 2001 From: Vinh Pham Date: Mon, 26 May 2025 23:52:08 +0100 Subject: [PATCH 3/6] [FLINK-37479][postgres] Add option to enable partitioned table --- .../connectors/flink-sources/postgres-cdc.md | 12 ++++++++++++ .../connectors/flink-sources/postgres-cdc.md | 12 ++++++++++++ .../postgres/source/PostgresDialect.java | 7 ++++++- .../postgres/source/PostgresSourceBuilder.java | 6 ++++++ .../source/config/PostgresSourceConfig.java | 14 +++++++++++++- .../config/PostgresSourceConfigFactory.java | 10 +++++++++- .../source/config/PostgresSourceOptions.java | 10 ++++++++++ .../source/utils/TableDiscoveryUtils.java | 13 +++++++++---- .../postgres/table/PostgreSQLTableFactory.java | 6 +++++- .../postgres/table/PostgreSQLTableSource.java | 9 +++++++-- .../postgres/source/PostgresDialectTest.java | 1 + .../table/MockPostgreSQLTableSource.java | 3 ++- .../table/PostgreSQLConnectorITCase.java | 1 + .../table/PostgreSQLTableFactoryTest.java | 16 +++++++++++----- 14 files changed, 104 insertions(+), 16 deletions(-) diff --git a/docs/content.zh/docs/connectors/flink-sources/postgres-cdc.md b/docs/content.zh/docs/connectors/flink-sources/postgres-cdc.md index 25bfacede4e..e72804950be 100644 --- a/docs/content.zh/docs/connectors/flink-sources/postgres-cdc.md +++ b/docs/content.zh/docs/connectors/flink-sources/postgres-cdc.md @@ -363,6 +363,18 @@ The following options is available only when `scan.incremental.snapshot.enabled= The table chunks would use evenly calculation optimization when the data distribution is even, and the query for splitting would happen when it is uneven. The distribution factor could be calculated by (MAX(id) - MIN(id) + 1) / rowCount. + + include-partitioned-table + optional + false + Boolean + + Whether to enable reading partitioned table.
+ If enabled: + (1) PUBLICATION must be created beforehand with parameter publish_via_partition_root=true + (2) Table list (regex or predefined list) should only match the parent table name, if table list matches both parent and child tables, snapshot data will be read twice. + + diff --git a/docs/content/docs/connectors/flink-sources/postgres-cdc.md b/docs/content/docs/connectors/flink-sources/postgres-cdc.md index b8e476801d5..10ec86132dd 100644 --- a/docs/content/docs/connectors/flink-sources/postgres-cdc.md +++ b/docs/content/docs/connectors/flink-sources/postgres-cdc.md @@ -253,6 +253,18 @@ SELECT * FROM shipments; Experimental option, defaults to false. + + include-partitioned-table + optional + false + Boolean + + Whether to enable reading partitioned table.
+ If enabled: + (1) PUBLICATION must be created beforehand with parameter publish_via_partition_root=true + (2) Table list (regex or predefined list) should only match the parent table name, if table list matches both parent and child tables, snapshot data will be read twice. + + diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialect.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialect.java index 000cfa9c4da..cf5240d718e 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialect.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialect.java @@ -176,9 +176,14 @@ public ChunkSplitter createChunkSplitter( @Override public List discoverDataCollections(JdbcSourceConfig sourceConfig) { try (JdbcConnection jdbc = openJdbcConnection(sourceConfig)) { + boolean includePartitionedTable = + ((PostgresSourceConfig) sourceConfig).getIncludePartitionedTable(); return TableDiscoveryUtils.listTables( // there is always a single database provided - sourceConfig.getDatabaseList().get(0), jdbc, sourceConfig.getTableFilters()); + sourceConfig.getDatabaseList().get(0), + jdbc, + sourceConfig.getTableFilters(), + includePartitionedTable); } catch (SQLException e) { throw new FlinkRuntimeException("Error to discover tables: " + e.getMessage(), e); } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java index 5ce690f0a3e..c10d4faa703 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java @@ -289,6 +289,12 @@ public PostgresSourceBuilder lsnCommitCheckpointsDelay(int lsnCommitDelay) { return this; } + /** Whether the connector should read from partitioned table or not */ + public PostgresSourceBuilder includePartitionedTable(boolean includePartitionedTable) { + this.configFactory.setIncludePartitionedTable(includePartitionedTable); + return this; + } + /** * Build the {@link PostgresIncrementalSource}. * diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfig.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfig.java index 2be7e2637df..3f42bd14d53 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfig.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfig.java @@ -38,6 +38,7 @@ public class PostgresSourceConfig extends JdbcSourceConfig { private final int subtaskId; private final int lsnCommitCheckpointsDelay; + private final boolean includePartitionedTable; public PostgresSourceConfig( int subtaskId, @@ -67,7 +68,8 @@ public PostgresSourceConfig( boolean skipSnapshotBackfill, boolean isScanNewlyAddedTableEnabled, int lsnCommitCheckpointsDelay, - boolean assignUnboundedChunkFirst) { + boolean assignUnboundedChunkFirst, + boolean includePartitionedTable) { super( startupOptions, databaseList, @@ -97,6 +99,7 @@ public PostgresSourceConfig( assignUnboundedChunkFirst); this.subtaskId = subtaskId; this.lsnCommitCheckpointsDelay = lsnCommitCheckpointsDelay; + this.includePartitionedTable = includePartitionedTable; } /** @@ -117,6 +120,15 @@ public int getLsnCommitCheckpointsDelay() { return this.lsnCommitCheckpointsDelay; } + /** + * Returns {@code includePartitionedTable} value. + * + * @return include partitioned table + */ + public boolean getIncludePartitionedTable() { + return includePartitionedTable; + } + /** * Returns the slot name for backfill task. * diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java index 69dea146e5c..5835db6e6eb 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java @@ -52,6 +52,8 @@ public class PostgresSourceConfigFactory extends JdbcSourceConfigFactory { private int lsnCommitCheckpointsDelay; + private boolean includePartitionedTable; + /** Creates a new {@link PostgresSourceConfig} for the given subtask {@code subtaskId}. */ @Override public PostgresSourceConfig create(int subtaskId) { @@ -133,7 +135,8 @@ public PostgresSourceConfig create(int subtaskId) { skipSnapshotBackfill, scanNewlyAddedTableEnabled, lsnCommitCheckpointsDelay, - assignUnboundedChunkFirst); + assignUnboundedChunkFirst, + includePartitionedTable); } /** @@ -182,4 +185,9 @@ public void heartbeatInterval(Duration heartbeatInterval) { public void setLsnCommitCheckpointsDelay(int lsnCommitCheckpointsDelay) { this.lsnCommitCheckpointsDelay = lsnCommitCheckpointsDelay; } + + /** Enable include partitioned table */ + public void setIncludePartitionedTable(boolean includePartitionedTable) { + this.includePartitionedTable = includePartitionedTable; + } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceOptions.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceOptions.java index 0443fa2e195..f90d0965d15 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceOptions.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceOptions.java @@ -88,4 +88,14 @@ public class PostgresSourceOptions extends JdbcSourceOptions { + "By setting this to higher value, the offset that is consumed by global slot will be " + "committed after multiple checkpoint delays instead of after each checkpoint completion.\n" + "This allows continuous recycle of log files in stream phase."); + + public static final ConfigOption INCLUDE_PARTITIONED_TABLE = + ConfigOptions.key("include-partitioned-table") + .booleanType() + .defaultValue(Boolean.FALSE) + .withDescription( + "Enable reading from partitioned table.\n" + + "If enabled:\n" + + "(1) PUBLICATION must be created beforehand with parameter publish_via_partition_root=true\n" + + "(2) Table list (regex or predefined list) should only match the parent table name, if table list matches both parent and child tables, snapshot data will be read twice."); } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/TableDiscoveryUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/TableDiscoveryUtils.java index 81c710b1cab..e7cd2ee8688 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/TableDiscoveryUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/TableDiscoveryUtils.java @@ -34,12 +34,17 @@ public class TableDiscoveryUtils { private static final Logger LOG = LoggerFactory.getLogger(TableDiscoveryUtils.class); public static List listTables( - String database, JdbcConnection jdbc, RelationalTableFilters tableFilters) + String database, + JdbcConnection jdbc, + RelationalTableFilters tableFilters, + boolean includePartitionedTable) throws SQLException { - Set allTableIds = - jdbc.readTableNames( - database, null, null, new String[] {"TABLE", "PARTITIONED TABLE"}); + String[] tableTypes = new String[] {"TABLE"}; + if (includePartitionedTable) { + tableTypes = new String[] {"TABLE", "PARTITIONED TABLE"}; + } + Set allTableIds = jdbc.readTableNames(database, null, null, tableTypes); Set capturedTables = allTableIds.stream() diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableFactory.java index bd069391df1..b4246f326da 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableFactory.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableFactory.java @@ -54,6 +54,7 @@ import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.CONNECT_TIMEOUT; import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.DECODING_PLUGIN_NAME; import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.HEARTBEAT_INTERVAL; +import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.INCLUDE_PARTITIONED_TABLE; import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.PG_PORT; import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN; import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE; @@ -117,6 +118,7 @@ public DynamicTableSource createDynamicTableSource(DynamicTableFactory.Context c boolean skipSnapshotBackfill = config.get(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP); boolean isScanNewlyAddedTableEnabled = config.get(SCAN_NEWLY_ADDED_TABLE_ENABLED); int lsnCommitCheckpointsDelay = config.get(SCAN_LSN_COMMIT_CHECKPOINTS_DELAY); + boolean includePartitionedTable = config.get(INCLUDE_PARTITIONED_TABLE); boolean assignUnboundedChunkFirst = config.get(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED); @@ -165,7 +167,8 @@ public DynamicTableSource createDynamicTableSource(DynamicTableFactory.Context c skipSnapshotBackfill, isScanNewlyAddedTableEnabled, lsnCommitCheckpointsDelay, - assignUnboundedChunkFirst); + assignUnboundedChunkFirst, + includePartitionedTable); } @Override @@ -209,6 +212,7 @@ public Set> optionalOptions() { options.add(SCAN_NEWLY_ADDED_TABLE_ENABLED); options.add(SCAN_LSN_COMMIT_CHECKPOINTS_DELAY); options.add(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED); + options.add(INCLUDE_PARTITIONED_TABLE); return options; } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableSource.java index 05553a79591..c92dc9bb026 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableSource.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableSource.java @@ -86,6 +86,7 @@ public class PostgreSQLTableSource implements ScanTableSource, SupportsReadingMe private final boolean scanNewlyAddedTableEnabled; private final int lsnCommitCheckpointsDelay; private final boolean assignUnboundedChunkFirst; + private final boolean includePartitionedTable; // -------------------------------------------------------------------------------------------- // Mutable attributes @@ -126,7 +127,8 @@ public PostgreSQLTableSource( boolean skipSnapshotBackfill, boolean isScanNewlyAddedTableEnabled, int lsnCommitCheckpointsDelay, - boolean assignUnboundedChunkFirst) { + boolean assignUnboundedChunkFirst, + boolean includePartitionedTable) { this.physicalSchema = physicalSchema; this.port = port; this.hostname = checkNotNull(hostname); @@ -159,6 +161,7 @@ public PostgreSQLTableSource( this.scanNewlyAddedTableEnabled = isScanNewlyAddedTableEnabled; this.lsnCommitCheckpointsDelay = lsnCommitCheckpointsDelay; this.assignUnboundedChunkFirst = assignUnboundedChunkFirst; + this.includePartitionedTable = includePartitionedTable; } @Override @@ -222,6 +225,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { .scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled) .lsnCommitCheckpointsDelay(lsnCommitCheckpointsDelay) .assignUnboundedChunkFirst(assignUnboundedChunkFirst) + .includePartitionedTable(includePartitionedTable) .build(); return SourceProvider.of(parallelSource); } else { @@ -291,7 +295,8 @@ public DynamicTableSource copy() { skipSnapshotBackfill, scanNewlyAddedTableEnabled, lsnCommitCheckpointsDelay, - assignUnboundedChunkFirst); + assignUnboundedChunkFirst, + includePartitionedTable); source.metadataKeys = metadataKeys; source.producedDataType = producedDataType; return source; diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialectTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialectTest.java index 32197c116db..957ca2cc48e 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialectTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialectTest.java @@ -106,6 +106,7 @@ void testDiscoverDataCollectionsForPartitionedTable() { PostgresSourceConfigFactory configFactoryOfInventoryPartitionedDatabase = getMockPostgresSourceConfigFactory( inventoryPartitionedDatabase, "inventory_partitioned", "products", 10); + configFactoryOfInventoryPartitionedDatabase.setIncludePartitionedTable(true); PostgresDialect dialectOfInventoryPartitionedDatabase = new PostgresDialect(configFactoryOfInventoryPartitionedDatabase.create(0)); List tableIdsOfInventoryPartitionedDatabase = diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/MockPostgreSQLTableSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/MockPostgreSQLTableSource.java index b19f206f544..27d364130b3 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/MockPostgreSQLTableSource.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/MockPostgreSQLTableSource.java @@ -66,7 +66,8 @@ public MockPostgreSQLTableSource(PostgreSQLTableSource postgreSQLTableSource) { (boolean) get(postgreSQLTableSource, "skipSnapshotBackfill"), (boolean) get(postgreSQLTableSource, "scanNewlyAddedTableEnabled"), (int) get(postgreSQLTableSource, "lsnCommitCheckpointsDelay"), - (boolean) get(postgreSQLTableSource, "assignUnboundedChunkFirst")); + (boolean) get(postgreSQLTableSource, "assignUnboundedChunkFirst"), + (boolean) get(postgreSQLTableSource, "includePartitionedTable")); } @Override diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java index f9025cc2384..d9d61984534 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java @@ -271,6 +271,7 @@ void testConsumingAllEventsForPartitionedTable(boolean parallelismSnapshot) + " 'schema-name' = '%s'," + " 'table-name' = '%s'," + " 'scan.incremental.snapshot.enabled' = '%s'," + + " 'include-partitioned-table' = 'true'," + " 'decoding.plugin.name' = 'pgoutput', " + " 'debezium.publication.name' = '%s'," + " 'slot.name' = '%s'" diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableFactoryTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableFactoryTest.java index e0977cdf0b9..64ec2b50c35 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableFactoryTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableFactoryTest.java @@ -63,6 +63,7 @@ import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND; import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND; import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.HEARTBEAT_INTERVAL; +import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.INCLUDE_PARTITIONED_TABLE; import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_LSN_COMMIT_CHECKPOINTS_DELAY; import static org.apache.flink.cdc.connectors.utils.AssertUtils.assertProducedTypeOfSourceFunction; @@ -153,7 +154,8 @@ void testCommonProperties() { SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(), SCAN_NEWLY_ADDED_TABLE_ENABLED.defaultValue(), SCAN_LSN_COMMIT_CHECKPOINTS_DELAY.defaultValue(), - SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED.defaultValue()); + SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED.defaultValue(), + INCLUDE_PARTITIONED_TABLE.defaultValue()); Assertions.assertThat(actualSource).isEqualTo(expectedSource); } @@ -200,7 +202,8 @@ void testOptionalProperties() { true, true, SCAN_LSN_COMMIT_CHECKPOINTS_DELAY.defaultValue(), - SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED.defaultValue()); + SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED.defaultValue(), + INCLUDE_PARTITIONED_TABLE.defaultValue()); Assertions.assertThat(actualSource).isEqualTo(expectedSource); } @@ -245,7 +248,8 @@ void testMetadataColumns() { SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(), SCAN_NEWLY_ADDED_TABLE_ENABLED.defaultValue(), SCAN_LSN_COMMIT_CHECKPOINTS_DELAY.defaultValue(), - SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED.defaultValue()); + SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED.defaultValue(), + INCLUDE_PARTITIONED_TABLE.defaultValue()); expectedSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType(); expectedSource.metadataKeys = Arrays.asList("row_kind", "op_ts", "database_name", "schema_name", "table_name"); @@ -300,7 +304,8 @@ void testEnableParallelReadSource() { SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(), SCAN_NEWLY_ADDED_TABLE_ENABLED.defaultValue(), SCAN_LSN_COMMIT_CHECKPOINTS_DELAY.defaultValue(), - SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED.defaultValue()); + SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED.defaultValue(), + INCLUDE_PARTITIONED_TABLE.defaultValue()); Assertions.assertThat(actualSource).isEqualTo(expectedSource); } @@ -345,7 +350,8 @@ void testStartupFromLatestOffset() { SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(), SCAN_NEWLY_ADDED_TABLE_ENABLED.defaultValue(), SCAN_LSN_COMMIT_CHECKPOINTS_DELAY.defaultValue(), - SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED.defaultValue()); + SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED.defaultValue(), + INCLUDE_PARTITIONED_TABLE.defaultValue()); Assertions.assertThat(actualSource).isEqualTo(expectedSource); } From b541e0e1342505994ed5d5c6582799d108d443a3 Mon Sep 17 00:00:00 2001 From: Vinh Pham Date: Sat, 31 May 2025 21:18:49 +0100 Subject: [PATCH 4/6] [FLINK-37479][postgres] Rename configuration --- .../docs/connectors/flink-sources/postgres-cdc.md | 4 ++-- .../docs/connectors/flink-sources/postgres-cdc.md | 4 ++-- .../connectors/postgres/source/PostgresDialect.java | 6 +++--- .../postgres/source/PostgresSourceBuilder.java | 6 +++--- .../postgres/source/config/PostgresSourceConfig.java | 12 ++++++------ .../source/config/PostgresSourceConfigFactory.java | 8 ++++---- .../source/config/PostgresSourceOptions.java | 6 +++--- .../postgres/source/utils/TableDiscoveryUtils.java | 4 ++-- .../postgres/table/PostgreSQLTableFactory.java | 8 ++++---- .../postgres/table/PostgreSQLTableSource.java | 10 +++++----- .../postgres/source/PostgresDialectTest.java | 2 +- .../postgres/table/MockPostgreSQLTableSource.java | 2 +- .../postgres/table/PostgreSQLConnectorITCase.java | 2 +- .../postgres/table/PostgreSQLTableFactoryTest.java | 12 ++++++------ 14 files changed, 43 insertions(+), 43 deletions(-) diff --git a/docs/content.zh/docs/connectors/flink-sources/postgres-cdc.md b/docs/content.zh/docs/connectors/flink-sources/postgres-cdc.md index 1050884375c..fe7eb2cd2e8 100644 --- a/docs/content.zh/docs/connectors/flink-sources/postgres-cdc.md +++ b/docs/content.zh/docs/connectors/flink-sources/postgres-cdc.md @@ -379,12 +379,12 @@ The following options is available only when `scan.incremental.snapshot.enabled= The distribution factor could be calculated by (MAX(id) - MIN(id) + 1) / rowCount. - include-partitioned-table + scan.publish-via-partition-root.enabled optional false Boolean - Whether to enable reading partitioned table.
+ Whether to enable reading partitioned tables via partition root.
If enabled: (1) PUBLICATION must be created beforehand with parameter publish_via_partition_root=true (2) Table list (regex or predefined list) should only match the parent table name, if table list matches both parent and child tables, snapshot data will be read twice. diff --git a/docs/content/docs/connectors/flink-sources/postgres-cdc.md b/docs/content/docs/connectors/flink-sources/postgres-cdc.md index 1ecb5c1e66f..b3b0c5b9d5e 100644 --- a/docs/content/docs/connectors/flink-sources/postgres-cdc.md +++ b/docs/content/docs/connectors/flink-sources/postgres-cdc.md @@ -266,12 +266,12 @@ SELECT * FROM shipments; - include-partitioned-table + scan.publish-via-partition-root.enabled optional false Boolean - Whether to enable reading partitioned table.
+ Whether to enable reading partitioned tables via partition root.
If enabled: (1) PUBLICATION must be created beforehand with parameter publish_via_partition_root=true (2) Table list (regex or predefined list) should only match the parent table name, if table list matches both parent and child tables, snapshot data will be read twice. diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialect.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialect.java index cf5240d718e..c0b16104e44 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialect.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialect.java @@ -176,14 +176,14 @@ public ChunkSplitter createChunkSplitter( @Override public List discoverDataCollections(JdbcSourceConfig sourceConfig) { try (JdbcConnection jdbc = openJdbcConnection(sourceConfig)) { - boolean includePartitionedTable = - ((PostgresSourceConfig) sourceConfig).getIncludePartitionedTable(); + boolean publishViaPartitionRoot = + ((PostgresSourceConfig) sourceConfig).getPublishViaPartitionRoot(); return TableDiscoveryUtils.listTables( // there is always a single database provided sourceConfig.getDatabaseList().get(0), jdbc, sourceConfig.getTableFilters(), - includePartitionedTable); + publishViaPartitionRoot); } catch (SQLException e) { throw new FlinkRuntimeException("Error to discover tables: " + e.getMessage(), e); } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java index c10d4faa703..030b7763aaf 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java @@ -289,9 +289,9 @@ public PostgresSourceBuilder lsnCommitCheckpointsDelay(int lsnCommitDelay) { return this; } - /** Whether the connector should read from partitioned table or not */ - public PostgresSourceBuilder includePartitionedTable(boolean includePartitionedTable) { - this.configFactory.setIncludePartitionedTable(includePartitionedTable); + /** Whether the connector should read partitioned tables via partition root or not */ + public PostgresSourceBuilder publishViaPartitionRoot(boolean publishViaPartitionRoot) { + this.configFactory.setPublishViaPartitionRoot(publishViaPartitionRoot); return this; } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfig.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfig.java index 3f42bd14d53..de3e09994b5 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfig.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfig.java @@ -38,7 +38,7 @@ public class PostgresSourceConfig extends JdbcSourceConfig { private final int subtaskId; private final int lsnCommitCheckpointsDelay; - private final boolean includePartitionedTable; + private final boolean publishViaPartitionRoot; public PostgresSourceConfig( int subtaskId, @@ -69,7 +69,7 @@ public PostgresSourceConfig( boolean isScanNewlyAddedTableEnabled, int lsnCommitCheckpointsDelay, boolean assignUnboundedChunkFirst, - boolean includePartitionedTable) { + boolean publishViaPartitionRoot) { super( startupOptions, databaseList, @@ -99,7 +99,7 @@ public PostgresSourceConfig( assignUnboundedChunkFirst); this.subtaskId = subtaskId; this.lsnCommitCheckpointsDelay = lsnCommitCheckpointsDelay; - this.includePartitionedTable = includePartitionedTable; + this.publishViaPartitionRoot = publishViaPartitionRoot; } /** @@ -121,12 +121,12 @@ public int getLsnCommitCheckpointsDelay() { } /** - * Returns {@code includePartitionedTable} value. + * Returns {@code publishViaPartitionRoot} value. * * @return include partitioned table */ - public boolean getIncludePartitionedTable() { - return includePartitionedTable; + public boolean getPublishViaPartitionRoot() { + return publishViaPartitionRoot; } /** diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java index 5835db6e6eb..ab9a15b12c6 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java @@ -52,7 +52,7 @@ public class PostgresSourceConfigFactory extends JdbcSourceConfigFactory { private int lsnCommitCheckpointsDelay; - private boolean includePartitionedTable; + private boolean publishViaPartitionRoot; /** Creates a new {@link PostgresSourceConfig} for the given subtask {@code subtaskId}. */ @Override @@ -136,7 +136,7 @@ public PostgresSourceConfig create(int subtaskId) { scanNewlyAddedTableEnabled, lsnCommitCheckpointsDelay, assignUnboundedChunkFirst, - includePartitionedTable); + publishViaPartitionRoot); } /** @@ -187,7 +187,7 @@ public void setLsnCommitCheckpointsDelay(int lsnCommitCheckpointsDelay) { } /** Enable include partitioned table */ - public void setIncludePartitionedTable(boolean includePartitionedTable) { - this.includePartitionedTable = includePartitionedTable; + public void setPublishViaPartitionRoot(boolean publishViaPartitionRoot) { + this.publishViaPartitionRoot = publishViaPartitionRoot; } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceOptions.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceOptions.java index f90d0965d15..927ddd73ae0 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceOptions.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceOptions.java @@ -89,12 +89,12 @@ public class PostgresSourceOptions extends JdbcSourceOptions { + "committed after multiple checkpoint delays instead of after each checkpoint completion.\n" + "This allows continuous recycle of log files in stream phase."); - public static final ConfigOption INCLUDE_PARTITIONED_TABLE = - ConfigOptions.key("include-partitioned-table") + public static final ConfigOption SCAN_PUBLISH_VIA_PARTITION_ROOT_ENABLED = + ConfigOptions.key("scan.publish-via-partition-root.enabled") .booleanType() .defaultValue(Boolean.FALSE) .withDescription( - "Enable reading from partitioned table.\n" + "Enable reading from partitioned table via partition root.\n" + "If enabled:\n" + "(1) PUBLICATION must be created beforehand with parameter publish_via_partition_root=true\n" + "(2) Table list (regex or predefined list) should only match the parent table name, if table list matches both parent and child tables, snapshot data will be read twice."); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/TableDiscoveryUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/TableDiscoveryUtils.java index e7cd2ee8688..f25f40d2694 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/TableDiscoveryUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/TableDiscoveryUtils.java @@ -37,11 +37,11 @@ public static List listTables( String database, JdbcConnection jdbc, RelationalTableFilters tableFilters, - boolean includePartitionedTable) + boolean publishViaPartitionRoot) throws SQLException { String[] tableTypes = new String[] {"TABLE"}; - if (includePartitionedTable) { + if (publishViaPartitionRoot) { tableTypes = new String[] {"TABLE", "PARTITIONED TABLE"}; } Set allTableIds = jdbc.readTableNames(database, null, null, tableTypes); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableFactory.java index b4246f326da..f1e105c04ee 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableFactory.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableFactory.java @@ -54,12 +54,12 @@ import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.CONNECT_TIMEOUT; import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.DECODING_PLUGIN_NAME; import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.HEARTBEAT_INTERVAL; -import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.INCLUDE_PARTITIONED_TABLE; import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.PG_PORT; import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN; import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE; import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED; import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_LSN_COMMIT_CHECKPOINTS_DELAY; +import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_PUBLISH_VIA_PARTITION_ROOT_ENABLED; import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE; import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_STARTUP_MODE; import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SLOT_NAME; @@ -118,7 +118,7 @@ public DynamicTableSource createDynamicTableSource(DynamicTableFactory.Context c boolean skipSnapshotBackfill = config.get(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP); boolean isScanNewlyAddedTableEnabled = config.get(SCAN_NEWLY_ADDED_TABLE_ENABLED); int lsnCommitCheckpointsDelay = config.get(SCAN_LSN_COMMIT_CHECKPOINTS_DELAY); - boolean includePartitionedTable = config.get(INCLUDE_PARTITIONED_TABLE); + boolean publishViaPartitionRoot = config.get(SCAN_PUBLISH_VIA_PARTITION_ROOT_ENABLED); boolean assignUnboundedChunkFirst = config.get(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED); @@ -168,7 +168,7 @@ public DynamicTableSource createDynamicTableSource(DynamicTableFactory.Context c isScanNewlyAddedTableEnabled, lsnCommitCheckpointsDelay, assignUnboundedChunkFirst, - includePartitionedTable); + publishViaPartitionRoot); } @Override @@ -212,7 +212,7 @@ public Set> optionalOptions() { options.add(SCAN_NEWLY_ADDED_TABLE_ENABLED); options.add(SCAN_LSN_COMMIT_CHECKPOINTS_DELAY); options.add(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED); - options.add(INCLUDE_PARTITIONED_TABLE); + options.add(SCAN_PUBLISH_VIA_PARTITION_ROOT_ENABLED); return options; } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableSource.java index c92dc9bb026..e89d57835d9 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableSource.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableSource.java @@ -86,7 +86,7 @@ public class PostgreSQLTableSource implements ScanTableSource, SupportsReadingMe private final boolean scanNewlyAddedTableEnabled; private final int lsnCommitCheckpointsDelay; private final boolean assignUnboundedChunkFirst; - private final boolean includePartitionedTable; + private final boolean publishViaPartitionRoot; // -------------------------------------------------------------------------------------------- // Mutable attributes @@ -128,7 +128,7 @@ public PostgreSQLTableSource( boolean isScanNewlyAddedTableEnabled, int lsnCommitCheckpointsDelay, boolean assignUnboundedChunkFirst, - boolean includePartitionedTable) { + boolean publishViaPartitionRoot) { this.physicalSchema = physicalSchema; this.port = port; this.hostname = checkNotNull(hostname); @@ -161,7 +161,7 @@ public PostgreSQLTableSource( this.scanNewlyAddedTableEnabled = isScanNewlyAddedTableEnabled; this.lsnCommitCheckpointsDelay = lsnCommitCheckpointsDelay; this.assignUnboundedChunkFirst = assignUnboundedChunkFirst; - this.includePartitionedTable = includePartitionedTable; + this.publishViaPartitionRoot = publishViaPartitionRoot; } @Override @@ -225,7 +225,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { .scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled) .lsnCommitCheckpointsDelay(lsnCommitCheckpointsDelay) .assignUnboundedChunkFirst(assignUnboundedChunkFirst) - .includePartitionedTable(includePartitionedTable) + .publishViaPartitionRoot(publishViaPartitionRoot) .build(); return SourceProvider.of(parallelSource); } else { @@ -296,7 +296,7 @@ public DynamicTableSource copy() { scanNewlyAddedTableEnabled, lsnCommitCheckpointsDelay, assignUnboundedChunkFirst, - includePartitionedTable); + publishViaPartitionRoot); source.metadataKeys = metadataKeys; source.producedDataType = producedDataType; return source; diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialectTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialectTest.java index 957ca2cc48e..dd41e36b850 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialectTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialectTest.java @@ -106,7 +106,7 @@ void testDiscoverDataCollectionsForPartitionedTable() { PostgresSourceConfigFactory configFactoryOfInventoryPartitionedDatabase = getMockPostgresSourceConfigFactory( inventoryPartitionedDatabase, "inventory_partitioned", "products", 10); - configFactoryOfInventoryPartitionedDatabase.setIncludePartitionedTable(true); + configFactoryOfInventoryPartitionedDatabase.setPublishViaPartitionRoot(true); PostgresDialect dialectOfInventoryPartitionedDatabase = new PostgresDialect(configFactoryOfInventoryPartitionedDatabase.create(0)); List tableIdsOfInventoryPartitionedDatabase = diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/MockPostgreSQLTableSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/MockPostgreSQLTableSource.java index 27d364130b3..8e1e4cc884f 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/MockPostgreSQLTableSource.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/MockPostgreSQLTableSource.java @@ -67,7 +67,7 @@ public MockPostgreSQLTableSource(PostgreSQLTableSource postgreSQLTableSource) { (boolean) get(postgreSQLTableSource, "scanNewlyAddedTableEnabled"), (int) get(postgreSQLTableSource, "lsnCommitCheckpointsDelay"), (boolean) get(postgreSQLTableSource, "assignUnboundedChunkFirst"), - (boolean) get(postgreSQLTableSource, "includePartitionedTable")); + (boolean) get(postgreSQLTableSource, "publishViaPartitionRoot")); } @Override diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java index d9d61984534..0b179bcb633 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java @@ -271,7 +271,7 @@ void testConsumingAllEventsForPartitionedTable(boolean parallelismSnapshot) + " 'schema-name' = '%s'," + " 'table-name' = '%s'," + " 'scan.incremental.snapshot.enabled' = '%s'," - + " 'include-partitioned-table' = 'true'," + + " 'scan.publish-via-partition-root.enabled' = 'true'," + " 'decoding.plugin.name' = 'pgoutput', " + " 'debezium.publication.name' = '%s'," + " 'slot.name' = '%s'" diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableFactoryTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableFactoryTest.java index 64ec2b50c35..03e93d14bef 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableFactoryTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableFactoryTest.java @@ -63,8 +63,8 @@ import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND; import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND; import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.HEARTBEAT_INTERVAL; -import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.INCLUDE_PARTITIONED_TABLE; import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_LSN_COMMIT_CHECKPOINTS_DELAY; +import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_PUBLISH_VIA_PARTITION_ROOT_ENABLED; import static org.apache.flink.cdc.connectors.utils.AssertUtils.assertProducedTypeOfSourceFunction; /** Test for {@link PostgreSQLTableSource} created by {@link PostgreSQLTableFactory}. */ @@ -155,7 +155,7 @@ void testCommonProperties() { SCAN_NEWLY_ADDED_TABLE_ENABLED.defaultValue(), SCAN_LSN_COMMIT_CHECKPOINTS_DELAY.defaultValue(), SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED.defaultValue(), - INCLUDE_PARTITIONED_TABLE.defaultValue()); + SCAN_PUBLISH_VIA_PARTITION_ROOT_ENABLED.defaultValue()); Assertions.assertThat(actualSource).isEqualTo(expectedSource); } @@ -203,7 +203,7 @@ void testOptionalProperties() { true, SCAN_LSN_COMMIT_CHECKPOINTS_DELAY.defaultValue(), SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED.defaultValue(), - INCLUDE_PARTITIONED_TABLE.defaultValue()); + SCAN_PUBLISH_VIA_PARTITION_ROOT_ENABLED.defaultValue()); Assertions.assertThat(actualSource).isEqualTo(expectedSource); } @@ -249,7 +249,7 @@ void testMetadataColumns() { SCAN_NEWLY_ADDED_TABLE_ENABLED.defaultValue(), SCAN_LSN_COMMIT_CHECKPOINTS_DELAY.defaultValue(), SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED.defaultValue(), - INCLUDE_PARTITIONED_TABLE.defaultValue()); + SCAN_PUBLISH_VIA_PARTITION_ROOT_ENABLED.defaultValue()); expectedSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType(); expectedSource.metadataKeys = Arrays.asList("row_kind", "op_ts", "database_name", "schema_name", "table_name"); @@ -305,7 +305,7 @@ void testEnableParallelReadSource() { SCAN_NEWLY_ADDED_TABLE_ENABLED.defaultValue(), SCAN_LSN_COMMIT_CHECKPOINTS_DELAY.defaultValue(), SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED.defaultValue(), - INCLUDE_PARTITIONED_TABLE.defaultValue()); + SCAN_PUBLISH_VIA_PARTITION_ROOT_ENABLED.defaultValue()); Assertions.assertThat(actualSource).isEqualTo(expectedSource); } @@ -351,7 +351,7 @@ void testStartupFromLatestOffset() { SCAN_NEWLY_ADDED_TABLE_ENABLED.defaultValue(), SCAN_LSN_COMMIT_CHECKPOINTS_DELAY.defaultValue(), SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED.defaultValue(), - INCLUDE_PARTITIONED_TABLE.defaultValue()); + SCAN_PUBLISH_VIA_PARTITION_ROOT_ENABLED.defaultValue()); Assertions.assertThat(actualSource).isEqualTo(expectedSource); } From f7104b3fda46693aa1b0e0a2a3abd02d0c3f0bdf Mon Sep 17 00:00:00 2001 From: Vinh Pham Date: Tue, 3 Jun 2025 21:08:15 +0100 Subject: [PATCH 5/6] [FLINK-37479][postgres] Fix comments missing period --- .../cdc/connectors/postgres/source/PostgresSourceBuilder.java | 2 +- .../postgres/source/config/PostgresSourceConfigFactory.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java index 030b7763aaf..d675b55e5e9 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java @@ -289,7 +289,7 @@ public PostgresSourceBuilder lsnCommitCheckpointsDelay(int lsnCommitDelay) { return this; } - /** Whether the connector should read partitioned tables via partition root or not */ + /** Whether the connector should read partitioned tables via partition root or not. */ public PostgresSourceBuilder publishViaPartitionRoot(boolean publishViaPartitionRoot) { this.configFactory.setPublishViaPartitionRoot(publishViaPartitionRoot); return this; diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java index ab9a15b12c6..d18ed0bd43d 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java @@ -186,7 +186,7 @@ public void setLsnCommitCheckpointsDelay(int lsnCommitCheckpointsDelay) { this.lsnCommitCheckpointsDelay = lsnCommitCheckpointsDelay; } - /** Enable include partitioned table */ + /** Enable include partitioned table. */ public void setPublishViaPartitionRoot(boolean publishViaPartitionRoot) { this.publishViaPartitionRoot = publishViaPartitionRoot; } From 3d0a2c944c68516f7eb333f07cb9a6ebea62a4dd Mon Sep 17 00:00:00 2001 From: Vinh Pham Date: Sat, 2 Aug 2025 23:55:21 +0100 Subject: [PATCH 6/6] [FLINK-37479][postgres] Rename configuration from feedback --- .../connectors/flink-sources/postgres-cdc.md | 24 +++++++++---------- .../connectors/flink-sources/postgres-cdc.md | 2 +- .../reader/PostgresPipelineRecordEmitter.java | 3 ++- .../postgres/source/PostgresDialect.java | 6 ++--- .../source/PostgresSourceBuilder.java | 4 ++-- .../source/config/PostgresSourceConfig.java | 12 +++++----- .../config/PostgresSourceConfigFactory.java | 8 +++---- .../source/config/PostgresSourceOptions.java | 4 ++-- .../source/utils/TableDiscoveryUtils.java | 4 ++-- .../table/PostgreSQLTableFactory.java | 8 +++---- .../postgres/table/PostgreSQLTableSource.java | 16 +++++++------ .../postgres/source/PostgresDialectTest.java | 2 +- .../table/MockPostgreSQLTableSource.java | 2 +- .../table/PostgreSQLConnectorITCase.java | 2 +- .../table/PostgreSQLTableFactoryTest.java | 12 +++++----- 15 files changed, 56 insertions(+), 53 deletions(-) diff --git a/docs/content.zh/docs/connectors/flink-sources/postgres-cdc.md b/docs/content.zh/docs/connectors/flink-sources/postgres-cdc.md index 382f0180497..b4116042207 100644 --- a/docs/content.zh/docs/connectors/flink-sources/postgres-cdc.md +++ b/docs/content.zh/docs/connectors/flink-sources/postgres-cdc.md @@ -281,6 +281,18 @@ Connector Options
  • false (default): All types of messages are sent as is.
  • + + scan.include-partitioned-tables.enabled + optional + false + Boolean + + Whether to enable reading partitioned tables via partition root.
    + If enabled: + (1) PUBLICATION must be created beforehand with parameter publish_via_partition_root=true + (2) Table list (regex or predefined list) should only match the parent table name, if table list matches both parent and child tables, snapshot data will be read twice. + + @@ -391,18 +403,6 @@ The following options is available only when `scan.incremental.snapshot.enabled= The table chunks would use evenly calculation optimization when the data distribution is even, and the query for splitting would happen when it is uneven. The distribution factor could be calculated by (MAX(id) - MIN(id) + 1) / rowCount. - - scan.publish-via-partition-root.enabled - optional - false - Boolean - - Whether to enable reading partitioned tables via partition root.
    - If enabled: - (1) PUBLICATION must be created beforehand with parameter publish_via_partition_root=true - (2) Table list (regex or predefined list) should only match the parent table name, if table list matches both parent and child tables, snapshot data will be read twice. - - diff --git a/docs/content/docs/connectors/flink-sources/postgres-cdc.md b/docs/content/docs/connectors/flink-sources/postgres-cdc.md index 91a2363bcd1..bec5e14d89a 100644 --- a/docs/content/docs/connectors/flink-sources/postgres-cdc.md +++ b/docs/content/docs/connectors/flink-sources/postgres-cdc.md @@ -279,7 +279,7 @@ SELECT * FROM shipments; - scan.publish-via-partition-root.enabled + scan.include-partitioned-tables.enabled optional false Boolean diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresPipelineRecordEmitter.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresPipelineRecordEmitter.java index f3540ddd4a2..3faddcf94c6 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresPipelineRecordEmitter.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresPipelineRecordEmitter.java @@ -130,7 +130,8 @@ private void generateCreateTableEvent(PostgresSourceConfig sourceConfig) { TableDiscoveryUtils.listTables( sourceConfig.getDatabaseList().get(0), jdbc, - sourceConfig.getTableFilters()); + sourceConfig.getTableFilters(), + sourceConfig.includePartitionedTables()); for (TableId tableId : capturedTableIds) { Schema schema = PostgresSchemaUtils.getTableSchema(tableId, sourceConfig, jdbc); createTableEventCache.add( diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialect.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialect.java index c0b16104e44..1c99f651cd2 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialect.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialect.java @@ -176,14 +176,14 @@ public ChunkSplitter createChunkSplitter( @Override public List discoverDataCollections(JdbcSourceConfig sourceConfig) { try (JdbcConnection jdbc = openJdbcConnection(sourceConfig)) { - boolean publishViaPartitionRoot = - ((PostgresSourceConfig) sourceConfig).getPublishViaPartitionRoot(); + boolean includePartitionedTables = + ((PostgresSourceConfig) sourceConfig).includePartitionedTables(); return TableDiscoveryUtils.listTables( // there is always a single database provided sourceConfig.getDatabaseList().get(0), jdbc, sourceConfig.getTableFilters(), - publishViaPartitionRoot); + includePartitionedTables); } catch (SQLException e) { throw new FlinkRuntimeException("Error to discover tables: " + e.getMessage(), e); } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java index 2d472cb20ad..bbd65b5e9ab 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java @@ -300,8 +300,8 @@ public PostgresSourceBuilder lsnCommitCheckpointsDelay(int lsnCommitDelay) { } /** Whether the connector should read partitioned tables via partition root or not. */ - public PostgresSourceBuilder publishViaPartitionRoot(boolean publishViaPartitionRoot) { - this.configFactory.setPublishViaPartitionRoot(publishViaPartitionRoot); + public PostgresSourceBuilder includePartitionedTables(boolean includePartitionedTables) { + this.configFactory.setIncludePartitionedTables(includePartitionedTables); return this; } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfig.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfig.java index df7d69cf50c..30271612800 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfig.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfig.java @@ -38,7 +38,7 @@ public class PostgresSourceConfig extends JdbcSourceConfig { private final int subtaskId; private final int lsnCommitCheckpointsDelay; - private final boolean publishViaPartitionRoot; + private final boolean includePartitionedTables; public PostgresSourceConfig( int subtaskId, @@ -69,7 +69,7 @@ public PostgresSourceConfig( boolean isScanNewlyAddedTableEnabled, int lsnCommitCheckpointsDelay, boolean assignUnboundedChunkFirst, - boolean publishViaPartitionRoot) { + boolean includePartitionedTables) { super( startupOptions, databaseList, @@ -99,7 +99,7 @@ public PostgresSourceConfig( assignUnboundedChunkFirst); this.subtaskId = subtaskId; this.lsnCommitCheckpointsDelay = lsnCommitCheckpointsDelay; - this.publishViaPartitionRoot = publishViaPartitionRoot; + this.includePartitionedTables = includePartitionedTables; } /** @@ -121,12 +121,12 @@ public int getLsnCommitCheckpointsDelay() { } /** - * Returns {@code publishViaPartitionRoot} value. + * Returns {@code includePartitionedTables} value. * * @return include partitioned table */ - public boolean getPublishViaPartitionRoot() { - return publishViaPartitionRoot; + public boolean includePartitionedTables() { + return includePartitionedTables; } /** diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java index 7748dad273f..670d4f37a56 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java @@ -52,7 +52,7 @@ public class PostgresSourceConfigFactory extends JdbcSourceConfigFactory { private int lsnCommitCheckpointsDelay; - private boolean publishViaPartitionRoot; + private boolean includePartitionedTables; /** Creates a new {@link PostgresSourceConfig} for the given subtask {@code subtaskId}. */ @Override @@ -136,7 +136,7 @@ public PostgresSourceConfig create(int subtaskId) { scanNewlyAddedTableEnabled, lsnCommitCheckpointsDelay, assignUnboundedChunkFirst, - publishViaPartitionRoot); + includePartitionedTables); } /** @@ -186,7 +186,7 @@ public void setLsnCommitCheckpointsDelay(int lsnCommitCheckpointsDelay) { } /** Enable include partitioned table. */ - public void setPublishViaPartitionRoot(boolean publishViaPartitionRoot) { - this.publishViaPartitionRoot = publishViaPartitionRoot; + public void setIncludePartitionedTables(boolean includePartitionedTables) { + this.includePartitionedTables = includePartitionedTables; } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceOptions.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceOptions.java index f4f43b8dad3..f498c264532 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceOptions.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceOptions.java @@ -88,8 +88,8 @@ public class PostgresSourceOptions extends JdbcSourceOptions { + "committed after multiple checkpoint delays instead of after each checkpoint completion.\n" + "This allows continuous recycle of log files in stream phase."); - public static final ConfigOption SCAN_PUBLISH_VIA_PARTITION_ROOT_ENABLED = - ConfigOptions.key("scan.publish-via-partition-root.enabled") + public static final ConfigOption SCAN_INCLUDE_PARTITIONED_TABLES_ENABLED = + ConfigOptions.key("scan.include-partitioned-tables.enabled") .booleanType() .defaultValue(Boolean.FALSE) .withDescription( diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/TableDiscoveryUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/TableDiscoveryUtils.java index f25f40d2694..2ff3e66d207 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/TableDiscoveryUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/TableDiscoveryUtils.java @@ -37,11 +37,11 @@ public static List listTables( String database, JdbcConnection jdbc, RelationalTableFilters tableFilters, - boolean publishViaPartitionRoot) + boolean includePartitionedTables) throws SQLException { String[] tableTypes = new String[] {"TABLE"}; - if (publishViaPartitionRoot) { + if (includePartitionedTables) { tableTypes = new String[] {"TABLE", "PARTITIONED TABLE"}; } Set allTableIds = jdbc.readTableNames(database, null, null, tableTypes); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableFactory.java index 68b1fe22dff..876f04a83bf 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableFactory.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableFactory.java @@ -55,11 +55,11 @@ import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.DECODING_PLUGIN_NAME; import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.HEARTBEAT_INTERVAL; import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.PG_PORT; +import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_INCLUDE_PARTITIONED_TABLES_ENABLED; import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN; import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE; import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED; import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_LSN_COMMIT_CHECKPOINTS_DELAY; -import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_PUBLISH_VIA_PARTITION_ROOT_ENABLED; import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE; import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_STARTUP_MODE; import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SLOT_NAME; @@ -118,7 +118,7 @@ public DynamicTableSource createDynamicTableSource(DynamicTableFactory.Context c boolean skipSnapshotBackfill = config.get(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP); boolean isScanNewlyAddedTableEnabled = config.get(SCAN_NEWLY_ADDED_TABLE_ENABLED); int lsnCommitCheckpointsDelay = config.get(SCAN_LSN_COMMIT_CHECKPOINTS_DELAY); - boolean publishViaPartitionRoot = config.get(SCAN_PUBLISH_VIA_PARTITION_ROOT_ENABLED); + boolean includePartitionedTables = config.get(SCAN_INCLUDE_PARTITIONED_TABLES_ENABLED); boolean assignUnboundedChunkFirst = config.get(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED); boolean appendOnly = config.get(SCAN_READ_CHANGELOG_AS_APPEND_ONLY_ENABLED); @@ -170,7 +170,7 @@ public DynamicTableSource createDynamicTableSource(DynamicTableFactory.Context c lsnCommitCheckpointsDelay, assignUnboundedChunkFirst, appendOnly, - publishViaPartitionRoot); + includePartitionedTables); } @Override @@ -215,7 +215,7 @@ public Set> optionalOptions() { options.add(SCAN_LSN_COMMIT_CHECKPOINTS_DELAY); options.add(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED); options.add(SCAN_READ_CHANGELOG_AS_APPEND_ONLY_ENABLED); - options.add(SCAN_PUBLISH_VIA_PARTITION_ROOT_ENABLED); + options.add(SCAN_INCLUDE_PARTITIONED_TABLES_ENABLED); return options; } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableSource.java index 80dffbaae7f..7ebb43f9b4e 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableSource.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableSource.java @@ -87,7 +87,7 @@ public class PostgreSQLTableSource implements ScanTableSource, SupportsReadingMe private final int lsnCommitCheckpointsDelay; private final boolean assignUnboundedChunkFirst; private final boolean appendOnly; - private final boolean publishViaPartitionRoot; + private final boolean includePartitionedTables; // -------------------------------------------------------------------------------------------- // Mutable attributes @@ -130,7 +130,7 @@ public PostgreSQLTableSource( int lsnCommitCheckpointsDelay, boolean assignUnboundedChunkFirst, boolean appendOnly, - boolean publishViaPartitionRoot) { + boolean includePartitionedTables) { this.physicalSchema = physicalSchema; this.port = port; this.hostname = checkNotNull(hostname); @@ -164,7 +164,7 @@ public PostgreSQLTableSource( this.lsnCommitCheckpointsDelay = lsnCommitCheckpointsDelay; this.assignUnboundedChunkFirst = assignUnboundedChunkFirst; this.appendOnly = appendOnly; - this.publishViaPartitionRoot = publishViaPartitionRoot; + this.includePartitionedTables = includePartitionedTables; } @Override @@ -233,7 +233,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { .scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled) .lsnCommitCheckpointsDelay(lsnCommitCheckpointsDelay) .assignUnboundedChunkFirst(assignUnboundedChunkFirst) - .publishViaPartitionRoot(publishViaPartitionRoot) + .includePartitionedTables(includePartitionedTables) .build(); return SourceProvider.of(parallelSource); } else { @@ -305,7 +305,7 @@ public DynamicTableSource copy() { lsnCommitCheckpointsDelay, assignUnboundedChunkFirst, appendOnly, - publishViaPartitionRoot); + includePartitionedTables); source.metadataKeys = metadataKeys; source.producedDataType = producedDataType; return source; @@ -350,7 +350,8 @@ public boolean equals(Object o) { && Objects.equals(skipSnapshotBackfill, that.skipSnapshotBackfill) && Objects.equals(scanNewlyAddedTableEnabled, that.scanNewlyAddedTableEnabled) && Objects.equals(assignUnboundedChunkFirst, that.assignUnboundedChunkFirst) - && Objects.equals(appendOnly, that.appendOnly); + && Objects.equals(appendOnly, that.appendOnly) + && Objects.equals(includePartitionedTables, that.includePartitionedTables); } @Override @@ -386,7 +387,8 @@ public int hashCode() { skipSnapshotBackfill, scanNewlyAddedTableEnabled, assignUnboundedChunkFirst, - appendOnly); + appendOnly, + includePartitionedTables); } @Override diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialectTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialectTest.java index dd41e36b850..0fb9e800743 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialectTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialectTest.java @@ -106,7 +106,7 @@ void testDiscoverDataCollectionsForPartitionedTable() { PostgresSourceConfigFactory configFactoryOfInventoryPartitionedDatabase = getMockPostgresSourceConfigFactory( inventoryPartitionedDatabase, "inventory_partitioned", "products", 10); - configFactoryOfInventoryPartitionedDatabase.setPublishViaPartitionRoot(true); + configFactoryOfInventoryPartitionedDatabase.setIncludePartitionedTables(true); PostgresDialect dialectOfInventoryPartitionedDatabase = new PostgresDialect(configFactoryOfInventoryPartitionedDatabase.create(0)); List tableIdsOfInventoryPartitionedDatabase = diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/MockPostgreSQLTableSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/MockPostgreSQLTableSource.java index 20c84e84142..ecd049b6f19 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/MockPostgreSQLTableSource.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/MockPostgreSQLTableSource.java @@ -68,7 +68,7 @@ public MockPostgreSQLTableSource(PostgreSQLTableSource postgreSQLTableSource) { (int) get(postgreSQLTableSource, "lsnCommitCheckpointsDelay"), (boolean) get(postgreSQLTableSource, "assignUnboundedChunkFirst"), (boolean) get(postgreSQLTableSource, "appendOnly"), - (boolean) get(postgreSQLTableSource, "publishViaPartitionRoot")); + (boolean) get(postgreSQLTableSource, "includePartitionedTables")); } @Override diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java index 68cdc64f0e4..7f4329be082 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java @@ -271,7 +271,7 @@ void testConsumingAllEventsForPartitionedTable(boolean parallelismSnapshot) + " 'schema-name' = '%s'," + " 'table-name' = '%s'," + " 'scan.incremental.snapshot.enabled' = '%s'," - + " 'scan.publish-via-partition-root.enabled' = 'true'," + + " 'scan.include-partitioned-tables.enabled' = 'true'," + " 'decoding.plugin.name' = 'pgoutput', " + " 'debezium.publication.name' = '%s'," + " 'slot.name' = '%s'" diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableFactoryTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableFactoryTest.java index 11e051f83d0..4a4cf4d4027 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableFactoryTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableFactoryTest.java @@ -64,8 +64,8 @@ import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND; import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND; import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.HEARTBEAT_INTERVAL; +import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_INCLUDE_PARTITIONED_TABLES_ENABLED; import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_LSN_COMMIT_CHECKPOINTS_DELAY; -import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_PUBLISH_VIA_PARTITION_ROOT_ENABLED; import static org.apache.flink.cdc.connectors.utils.AssertUtils.assertProducedTypeOfSourceFunction; /** Test for {@link PostgreSQLTableSource} created by {@link PostgreSQLTableFactory}. */ @@ -157,7 +157,7 @@ void testCommonProperties() { SCAN_LSN_COMMIT_CHECKPOINTS_DELAY.defaultValue(), SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED.defaultValue(), SCAN_READ_CHANGELOG_AS_APPEND_ONLY_ENABLED.defaultValue(), - SCAN_PUBLISH_VIA_PARTITION_ROOT_ENABLED.defaultValue()); + SCAN_INCLUDE_PARTITIONED_TABLES_ENABLED.defaultValue()); Assertions.assertThat(actualSource).isEqualTo(expectedSource); } @@ -207,7 +207,7 @@ void testOptionalProperties() { SCAN_LSN_COMMIT_CHECKPOINTS_DELAY.defaultValue(), SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED.defaultValue(), true, - SCAN_PUBLISH_VIA_PARTITION_ROOT_ENABLED.defaultValue()); + SCAN_INCLUDE_PARTITIONED_TABLES_ENABLED.defaultValue()); Assertions.assertThat(actualSource).isEqualTo(expectedSource); } @@ -254,7 +254,7 @@ void testMetadataColumns() { SCAN_LSN_COMMIT_CHECKPOINTS_DELAY.defaultValue(), SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED.defaultValue(), SCAN_READ_CHANGELOG_AS_APPEND_ONLY_ENABLED.defaultValue(), - SCAN_PUBLISH_VIA_PARTITION_ROOT_ENABLED.defaultValue()); + SCAN_INCLUDE_PARTITIONED_TABLES_ENABLED.defaultValue()); expectedSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType(); expectedSource.metadataKeys = Arrays.asList("row_kind", "op_ts", "database_name", "schema_name", "table_name"); @@ -311,7 +311,7 @@ void testEnableParallelReadSource() { SCAN_LSN_COMMIT_CHECKPOINTS_DELAY.defaultValue(), SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED.defaultValue(), SCAN_READ_CHANGELOG_AS_APPEND_ONLY_ENABLED.defaultValue(), - SCAN_PUBLISH_VIA_PARTITION_ROOT_ENABLED.defaultValue()); + SCAN_INCLUDE_PARTITIONED_TABLES_ENABLED.defaultValue()); Assertions.assertThat(actualSource).isEqualTo(expectedSource); } @@ -358,7 +358,7 @@ void testStartupFromLatestOffset() { SCAN_LSN_COMMIT_CHECKPOINTS_DELAY.defaultValue(), SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED.defaultValue(), SCAN_READ_CHANGELOG_AS_APPEND_ONLY_ENABLED.defaultValue(), - SCAN_PUBLISH_VIA_PARTITION_ROOT_ENABLED.defaultValue()); + SCAN_INCLUDE_PARTITIONED_TABLES_ENABLED.defaultValue()); Assertions.assertThat(actualSource).isEqualTo(expectedSource); }