Skip to content

Commit 6788ff5

Browse files
tchivslvyanquan
andauthored
[FLINK-38188][pipeline-connector][postgres] Fix database name validation logic in PostgresDataSourceFactory (#4075)
--------- Co-authored-by: lvyanquan <lvyanquan.lyq@alibaba-inc.com>
1 parent b937fdc commit 6788ff5

File tree

3 files changed

+61
-16
lines changed

3 files changed

+61
-16
lines changed

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactory.java

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -369,23 +369,19 @@ private Optional<String> getValidateDatabaseName(String tables) {
369369
String.format(
370370
"Tables format must db.schema.table, can not 'tables' = %s",
371371
TABLES.key()));
372-
if (tableNameParts.length == 3) {
373-
String currentDbName = tableNameParts[0];
372+
String currentDbName = tableNameParts[0];
374373

374+
checkState(
375+
isValidPostgresDbName(currentDbName),
376+
String.format("%s is not a valid PostgreSQL database name", currentDbName));
377+
if (dbName == null) {
378+
dbName = currentDbName;
379+
} else {
375380
checkState(
376-
isValidPostgresDbName(currentDbName),
377-
String.format(
378-
"The value of option %s does not conform to PostgresSQL database name naming conventions",
379-
TABLES.key()));
380-
if (dbName == null) {
381-
dbName = currentDbName;
382-
} else {
383-
checkState(
384-
!dbName.equals(currentDbName),
385-
String.format(
386-
"The value of option %s all table names must have the same database name",
387-
TABLES.key()));
388-
}
381+
dbName.equals(currentDbName),
382+
"The value of option `%s` is `%s`, but not all table names have the same database name",
383+
TABLES.key(),
384+
String.join(",", tableNames));
389385
}
390386
}
391387

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactoryTest.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,54 @@ public void testPrefixRequireOption() {
257257
.isEqualTo(Arrays.asList("inventory.products"));
258258
}
259259

260+
@Test
261+
public void testTableValidationWithDifferentDatabases() {
262+
Map<String, String> options = new HashMap<>();
263+
options.put(HOSTNAME.key(), POSTGRES_CONTAINER.getHost());
264+
options.put(
265+
PG_PORT.key(), String.valueOf(POSTGRES_CONTAINER.getMappedPort(POSTGRESQL_PORT)));
266+
options.put(USERNAME.key(), TEST_USER);
267+
options.put(PASSWORD.key(), TEST_PASSWORD);
268+
options.put(
269+
TABLES.key(),
270+
"aia_test.public.aia_t_icc_jjdb,different_db.public.aia_t_icc_jjdb_extend");
271+
options.put(SLOT_NAME.key(), slotName);
272+
273+
PostgresDataSourceFactory factory = new PostgresDataSourceFactory();
274+
Factory.Context context = new MockContext(Configuration.fromMap(options));
275+
276+
assertThatThrownBy(() -> factory.createDataSource(context))
277+
.isInstanceOf(IllegalStateException.class)
278+
.hasMessageContaining(
279+
"The value of option `tables` is `aia_test.public.aia_t_icc_jjdb,different_db.public.aia_t_icc_jjdb_extend`, but not all table names have the same database name");
280+
}
281+
282+
@Test
283+
public void testTableValidationWithOriginalBugScenario() {
284+
Map<String, String> options = new HashMap<>();
285+
options.put(HOSTNAME.key(), POSTGRES_CONTAINER.getHost());
286+
options.put(
287+
PG_PORT.key(), String.valueOf(POSTGRES_CONTAINER.getMappedPort(POSTGRESQL_PORT)));
288+
options.put(USERNAME.key(), TEST_USER);
289+
options.put(PASSWORD.key(), TEST_PASSWORD);
290+
String tables =
291+
POSTGRES_CONTAINER.getDatabaseName()
292+
+ ".public.aia_t_icc_jjdb,"
293+
+ POSTGRES_CONTAINER.getDatabaseName()
294+
+ ".public.aia_t_icc_jjdb_\\\\d{6},"
295+
+ POSTGRES_CONTAINER.getDatabaseName()
296+
+ ".public.aia_t_icc_jjdb_extend";
297+
options.put(TABLES.key(), tables);
298+
options.put(SLOT_NAME.key(), slotName);
299+
300+
PostgresDataSourceFactory factory = new PostgresDataSourceFactory();
301+
Factory.Context context = new MockContext(Configuration.fromMap(options));
302+
303+
assertThatThrownBy(() -> factory.createDataSource(context))
304+
.isInstanceOf(IllegalArgumentException.class)
305+
.hasMessageContaining("Cannot find any table by the option 'tables'");
306+
}
307+
260308
class MockContext implements Factory.Context {
261309

262310
Configuration factoryConfiguration;

flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/PostgresE2eITCase.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ void testSyncWholeDatabase() throws Exception {
111111
+ " port: %d\n"
112112
+ " username: %s\n"
113113
+ " password: %s\n"
114-
+ " tables: %s.inventory.\\.*\n"
114+
+ " tables: %s.inventory.products,%s.inventory.customers\n"
115115
+ " slot.name: %s\n"
116116
+ " scan.startup.mode: initial\n"
117117
+ " server-time-zone: UTC\n"
@@ -127,6 +127,7 @@ void testSyncWholeDatabase() throws Exception {
127127
POSTGRES_TEST_USER,
128128
POSTGRES_TEST_PASSWORD,
129129
postgresInventoryDatabase.getDatabaseName(),
130+
postgresInventoryDatabase.getDatabaseName(),
130131
slotName,
131132
parallelism);
132133
Path postgresCdcJar = TestUtils.getResource("postgres-cdc-pipeline-connector.jar");

0 commit comments

Comments
 (0)