Skip to content

Commit 1c0c5af

Browse files
linjianchanglinjc13
andauthored
[FLINK-37634][pipeline-connector/mysql]When the table name of mysql supports size sensitive, fix the error that table name not being found. (#3996)
Co-authored-by: linjc13 <linjc13@chinatelecom.cn>
1 parent 1454b36 commit 1c0c5af

File tree

8 files changed

+376
-13
lines changed

8 files changed

+376
-13
lines changed

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSource.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory;
3030
import org.apache.flink.cdc.connectors.mysql.source.reader.MySqlPipelineRecordEmitter;
3131
import org.apache.flink.cdc.connectors.mysql.table.MySqlReadableMetadata;
32+
import org.apache.flink.cdc.connectors.mysql.utils.MySqlSchemaUtils;
3233
import org.apache.flink.cdc.debezium.table.DebeziumChangelogMode;
3334

3435
import io.debezium.relational.RelationalDatabaseConnectorConfig;
@@ -72,7 +73,8 @@ public EventSourceProvider getEventSourceProvider() {
7273
sourceConfig.isIncludeSchemaChanges(),
7374
readableMetadataList,
7475
includeComments,
75-
sourceConfig.isTreatTinyInt1AsBoolean());
76+
sourceConfig.isTreatTinyInt1AsBoolean(),
77+
MySqlSchemaUtils.isTableIdCaseInsensitive(sourceConfig));
7678

7779
MySqlSource<Event> source =
7880
new MySqlSource<>(

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlEventDeserializer.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,37 +69,45 @@ public class MySqlEventDeserializer extends DebeziumEventDeserializationSchema {
6969
private transient CustomMySqlAntlrDdlParser customParser;
7070

7171
private List<MySqlReadableMetadata> readableMetadataList;
72+
private boolean isTableIdCaseInsensitive;
7273

7374
public MySqlEventDeserializer(
7475
DebeziumChangelogMode changelogMode,
7576
boolean includeSchemaChanges,
76-
boolean tinyInt1isBit) {
77+
boolean tinyInt1isBit,
78+
boolean isTableIdCaseInsensitive) {
7779
this(
7880
changelogMode,
7981
includeSchemaChanges,
8082
new ArrayList<>(),
8183
includeSchemaChanges,
82-
tinyInt1isBit);
84+
tinyInt1isBit,
85+
isTableIdCaseInsensitive);
86+
this.isTableIdCaseInsensitive = isTableIdCaseInsensitive;
8387
}
8488

8589
public MySqlEventDeserializer(
8690
DebeziumChangelogMode changelogMode,
8791
boolean includeSchemaChanges,
8892
List<MySqlReadableMetadata> readableMetadataList,
8993
boolean includeComments,
90-
boolean tinyInt1isBit) {
94+
boolean tinyInt1isBit,
95+
boolean isTableIdCaseInsensitive) {
9196
super(new MySqlSchemaDataTypeInference(), changelogMode);
9297
this.includeSchemaChanges = includeSchemaChanges;
9398
this.readableMetadataList = readableMetadataList;
9499
this.includeComments = includeComments;
95100
this.tinyInt1isBit = tinyInt1isBit;
101+
this.isTableIdCaseInsensitive = isTableIdCaseInsensitive;
96102
}
97103

98104
@Override
99105
protected List<SchemaChangeEvent> deserializeSchemaChangeRecord(SourceRecord record) {
100106
if (includeSchemaChanges) {
101107
if (customParser == null) {
102-
customParser = new CustomMySqlAntlrDdlParser(includeComments, tinyInt1isBit);
108+
customParser =
109+
new CustomMySqlAntlrDdlParser(
110+
includeComments, tinyInt1isBit, isTableIdCaseInsensitive);
103111
tables = new Tables();
104112
}
105113

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import java.util.HashMap;
4646
import java.util.LinkedList;
4747
import java.util.List;
48+
import java.util.Locale;
4849
import java.util.Map;
4950
import java.util.stream.Collectors;
5051

@@ -65,18 +66,20 @@ public class CustomAlterTableParserListener extends MySqlParserBaseListener {
6566
private List<ColumnEditor> columnEditors;
6667
private CustomColumnDefinitionParserListener columnDefinitionListener;
6768
private TableEditor tableEditor;
68-
69+
private boolean isTableIdCaseInsensitive;
6970
private int parsingColumnIndex = STARTING_INDEX;
7071

7172
public CustomAlterTableParserListener(
7273
MySqlAntlrDdlParser parser,
7374
List<ParseTreeListener> listeners,
7475
LinkedList<SchemaChangeEvent> changes,
75-
boolean tinyInt1isBit) {
76+
boolean tinyInt1isBit,
77+
boolean isTableIdCaseInsensitive) {
7678
this.parser = parser;
7779
this.listeners = listeners;
7880
this.changes = changes;
7981
this.tinyInt1isBit = tinyInt1isBit;
82+
this.isTableIdCaseInsensitive = isTableIdCaseInsensitive;
8083
}
8184

8285
@Override
@@ -438,7 +441,13 @@ private org.apache.flink.cdc.common.schema.Column toCdcColumn(Column dbzColumn)
438441
}
439442

440443
private org.apache.flink.cdc.common.event.TableId toCdcTableId(TableId dbzTableId) {
441-
return org.apache.flink.cdc.common.event.TableId.tableId(
442-
dbzTableId.catalog(), dbzTableId.table());
444+
if (isTableIdCaseInsensitive) {
445+
return org.apache.flink.cdc.common.event.TableId.tableId(
446+
dbzTableId.catalog().toLowerCase(Locale.ROOT),
447+
dbzTableId.table().toLowerCase(Locale.ROOT));
448+
} else {
449+
return org.apache.flink.cdc.common.event.TableId.tableId(
450+
dbzTableId.catalog(), dbzTableId.table());
451+
}
443452
}
444453
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomMySqlAntlrDdlParser.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,14 @@ public class CustomMySqlAntlrDdlParser extends MySqlAntlrDdlParser {
3636

3737
private final LinkedList<SchemaChangeEvent> parsedEvents;
3838
private final boolean tinyInt1isBit;
39+
private boolean isTableIdCaseInsensitive;
3940

40-
public CustomMySqlAntlrDdlParser(boolean includeComments, boolean tinyInt1isBit) {
41+
public CustomMySqlAntlrDdlParser(
42+
boolean includeComments, boolean tinyInt1isBit, boolean isTableIdCaseInsensitive) {
4143
super(true, false, includeComments, null, Tables.TableFilter.includeAll());
4244
this.parsedEvents = new LinkedList<>();
4345
this.tinyInt1isBit = tinyInt1isBit;
46+
this.isTableIdCaseInsensitive = isTableIdCaseInsensitive;
4447
}
4548

4649
// Overriding this method because the BIT type requires default length dimension of 1.
@@ -280,7 +283,8 @@ protected DataTypeResolver initializeDataTypeResolver() {
280283

281284
@Override
282285
protected AntlrDdlParserListener createParseTreeWalkerListener() {
283-
return new CustomMySqlAntlrDdlParserListener(this, parsedEvents, tinyInt1isBit);
286+
return new CustomMySqlAntlrDdlParserListener(
287+
this, parsedEvents, tinyInt1isBit, isTableIdCaseInsensitive);
284288
}
285289

286290
public List<SchemaChangeEvent> getAndClearParsedEvents() {

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomMySqlAntlrDdlParserListener.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,13 +76,15 @@ public class CustomMySqlAntlrDdlParserListener extends MySqlParserBaseListener
7676
public CustomMySqlAntlrDdlParserListener(
7777
MySqlAntlrDdlParser parser,
7878
LinkedList<SchemaChangeEvent> parsedEvents,
79-
boolean tinyInt1isBit) {
79+
boolean tinyInt1isBit,
80+
boolean isTableIdCaseInsensitive) {
8081
// initialize listeners
8182
listeners.add(new CreateAndAlterDatabaseParserListener(parser));
8283
listeners.add(new DropDatabaseParserListener(parser));
8384
listeners.add(new CreateTableParserListener(parser, listeners));
8485
listeners.add(
85-
new CustomAlterTableParserListener(parser, listeners, parsedEvents, tinyInt1isBit));
86+
new CustomAlterTableParserListener(
87+
parser, listeners, parsedEvents, tinyInt1isBit, isTableIdCaseInsensitive));
8688
listeners.add(new DropTableParserListener(parser));
8789
listeners.add(new RenameTableParserListener(parser));
8890
listeners.add(new TruncateTableParserListener(parser));

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlSchemaUtils.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,5 +158,13 @@ public static io.debezium.relational.TableId toDbzTableId(TableId tableId) {
158158
tableId.getSchemaName(), null, tableId.getTableName());
159159
}
160160

161+
public static boolean isTableIdCaseInsensitive(MySqlSourceConfig sourceConfig) {
162+
try (MySqlConnection jdbc = createMySqlConnection(sourceConfig)) {
163+
return jdbc.isTableIdCaseSensitive();
164+
} catch (Exception e) {
165+
throw new RuntimeException("Error to get table id caseSensitive: " + e.getMessage(), e);
166+
}
167+
}
168+
161169
private MySqlSchemaUtils() {}
162170
}

0 commit comments

Comments
 (0)