diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlSchemaUtils.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlSchemaUtils.java index ca1b972af83..7786486ccf4 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlSchemaUtils.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlSchemaUtils.java @@ -126,7 +126,7 @@ public static Schema getTableSchema( MySqlConnection jdbc) { // fetch table schemas try (MySqlSchema mySqlSchema = - new MySqlSchema(sourceConfig, jdbc.isTableIdCaseSensitive())) { + new MySqlSchema(sourceConfig, jdbc.isTableIdCaseInsensitive())) { TableChanges.TableChange tableSchema = mySqlSchema.getTableSchema(partition, jdbc, toDbzTableId(tableId)); return toSchema(tableSchema.getTable(), sourceConfig.isTreatTinyInt1AsBoolean()); @@ -160,7 +160,7 @@ public static io.debezium.relational.TableId toDbzTableId(TableId tableId) { public static boolean isTableIdCaseInsensitive(MySqlSourceConfig sourceConfig) { try (MySqlConnection jdbc = createMySqlConnection(sourceConfig)) { - return jdbc.isTableIdCaseSensitive(); + return jdbc.isTableIdCaseInsensitive(); } catch (Exception e) { throw new RuntimeException("Error to get table id caseSensitive: " + e.getMessage(), e); } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/dialect/DataSourceDialect.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/dialect/DataSourceDialect.java index 63fa7e9302c..e1effd1d30f 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/dialect/DataSourceDialect.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/dialect/DataSourceDialect.java @@ -67,7 +67,7 @@ default Offset displayCommittedOffset(C sourceConfig) { } /** Check if the CollectionId is case-sensitive or not. */ - boolean isDataCollectionIdCaseSensitive(C sourceConfig); + boolean isDataCollectionIdCaseInsensitive(C sourceConfig); /** Returns the {@link ChunkSplitter} which used to split collection to splits. */ @Deprecated diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/IncrementalSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/IncrementalSource.java index dfee377562f..8635ac325ee 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/IncrementalSource.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/IncrementalSource.java @@ -151,14 +151,14 @@ public SplitEnumerator createEnumerator( try { final List remainingTables = dataSourceDialect.discoverDataCollections(sourceConfig); - boolean isTableIdCaseSensitive = - dataSourceDialect.isDataCollectionIdCaseSensitive(sourceConfig); + boolean isTableIdCaseInsensitive = + dataSourceDialect.isDataCollectionIdCaseInsensitive(sourceConfig); splitAssigner = new HybridSplitAssigner<>( sourceConfig, enumContext.currentParallelism(), remainingTables, - isTableIdCaseSensitive, + isTableIdCaseInsensitive, dataSourceDialect, offsetFactory, enumContext); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/HybridSplitAssigner.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/HybridSplitAssigner.java index a4d21ada6f2..8738c948d53 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/HybridSplitAssigner.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/HybridSplitAssigner.java @@ -71,7 +71,7 @@ public HybridSplitAssigner( C sourceConfig, int currentParallelism, List remainingTables, - boolean isTableIdCaseSensitive, + boolean isTableIdCaseInsensitive, DataSourceDialect dialect, OffsetFactory offsetFactory, SplitEnumeratorContext enumeratorContext) { @@ -81,7 +81,7 @@ public HybridSplitAssigner( sourceConfig, currentParallelism, remainingTables, - isTableIdCaseSensitive, + isTableIdCaseInsensitive, dialect, offsetFactory), false, diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java index 0b1a4100bc7..b7e3bec62ba 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java @@ -84,7 +84,7 @@ public class SnapshotSplitAssigner implements SplitAssig private final boolean isRemainingTablesCheckpointed; private ChunkSplitter chunkSplitter; - private boolean isTableIdCaseSensitive; + private boolean isTableIdCaseInsensitive; @Nullable private Long checkpointIdToFinish; private final DataSourceDialect dialect; @@ -102,7 +102,7 @@ public SnapshotSplitAssigner( C sourceConfig, int currentParallelism, List remainingTables, - boolean isTableIdCaseSensitive, + boolean isTableIdCaseInsensitive, DataSourceDialect dialect, OffsetFactory offsetFactory) { this( @@ -115,7 +115,7 @@ public SnapshotSplitAssigner( new HashMap<>(), INITIAL_ASSIGNING, remainingTables, - isTableIdCaseSensitive, + isTableIdCaseInsensitive, true, dialect, offsetFactory, @@ -139,7 +139,7 @@ public SnapshotSplitAssigner( checkpoint.getSplitFinishedOffsets(), checkpoint.getSnapshotAssignerStatus(), checkpoint.getRemainingTables(), - checkpoint.isTableIdCaseSensitive(), + checkpoint.isTableIdCaseInsensitive(), checkpoint.isRemainingTablesCheckpointed(), dialect, offsetFactory, @@ -157,7 +157,7 @@ private SnapshotSplitAssigner( Map splitFinishedOffsets, AssignerStatus assignerStatus, List remainingTables, - boolean isTableIdCaseSensitive, + boolean isTableIdCaseInsensitive, boolean isRemainingTablesCheckpointed, DataSourceDialect dialect, OffsetFactory offsetFactory, @@ -183,7 +183,7 @@ private SnapshotSplitAssigner( this.assignerStatus = assignerStatus; this.remainingTables = new CopyOnWriteArrayList<>(remainingTables); this.isRemainingTablesCheckpointed = isRemainingTablesCheckpointed; - this.isTableIdCaseSensitive = isTableIdCaseSensitive; + this.isTableIdCaseInsensitive = isTableIdCaseInsensitive; this.dialect = dialect; this.offsetFactory = offsetFactory; this.splitFinishedCheckpointIds = splitFinishedCheckpointIds; @@ -524,7 +524,7 @@ public SnapshotPendingSplitsState snapshotState(long checkpointId) { splitFinishedOffsets, assignerStatus, remainingTables, - isTableIdCaseSensitive, + isTableIdCaseInsensitive, true, splitFinishedCheckpointIds, chunkSplitter.snapshotState(checkpointId)); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/state/PendingSplitsStateSerializer.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/state/PendingSplitsStateSerializer.java index c823c74f2b5..2f5da6217e1 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/state/PendingSplitsStateSerializer.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/state/PendingSplitsStateSerializer.java @@ -175,7 +175,7 @@ private void serializeSnapshotPendingSplitsState( writeFinishedOffsets(state.getSplitFinishedOffsets(), out); out.writeInt(state.getSnapshotAssignerStatus().getStatusCode()); writeTableIds(state.getRemainingTables(), out); - out.writeBoolean(state.isTableIdCaseSensitive()); + out.writeBoolean(state.isTableIdCaseInsensitive()); writeTableSchemas(state.getTableSchemas(), out); writeSplitFinishedCheckpointIds(state.getSplitFinishedCheckpointIds(), out); @@ -283,7 +283,7 @@ private SnapshotPendingSplitsState deserializeSnapshotPendingSplitsState( } } List remainingTableIds = readTableIds(version, in); - boolean isTableIdCaseSensitive = in.readBoolean(); + boolean isTableIdCaseInsensitive = in.readBoolean(); final List remainingSchemalessSplits = new ArrayList<>(); final Map assignedSchemalessSnapshotSplits = new HashMap<>(); @@ -333,7 +333,7 @@ private SnapshotPendingSplitsState deserializeSnapshotPendingSplitsState( finishedOffsets, assignerStatus, remainingTableIds, - isTableIdCaseSensitive, + isTableIdCaseInsensitive, true, splitFinishedCheckpointIds, chunkSplitterState); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/state/SnapshotPendingSplitsState.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/state/SnapshotPendingSplitsState.java index 367bc792d14..21ae814bc5b 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/state/SnapshotPendingSplitsState.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/state/SnapshotPendingSplitsState.java @@ -61,7 +61,7 @@ public class SnapshotPendingSplitsState extends PendingSplitsState { private final AssignerStatus assignerStatus; /** Whether the table identifier is case sensitive. */ - private final boolean isTableIdCaseSensitive; + private final boolean isTableIdCaseInsensitive; /** Whether the remaining tables are keep when snapshot state. */ private final boolean isRemainingTablesCheckpointed; @@ -85,7 +85,7 @@ public SnapshotPendingSplitsState( Map splitFinishedOffsets, AssignerStatus assignerStatus, List remainingTables, - boolean isTableIdCaseSensitive, + boolean isTableIdCaseInsensitive, boolean isRemainingTablesCheckpointed, Map splitFinishedCheckpointIds, ChunkSplitterState chunkSplitterState) { @@ -95,7 +95,7 @@ public SnapshotPendingSplitsState( this.splitFinishedOffsets = splitFinishedOffsets; this.assignerStatus = assignerStatus; this.remainingTables = remainingTables; - this.isTableIdCaseSensitive = isTableIdCaseSensitive; + this.isTableIdCaseInsensitive = isTableIdCaseInsensitive; this.isRemainingTablesCheckpointed = isRemainingTablesCheckpointed; this.tableSchemas = tableSchemas; this.chunkSplitterState = chunkSplitterState; @@ -134,8 +134,8 @@ public List getRemainingTables() { return remainingTables; } - public boolean isTableIdCaseSensitive() { - return isTableIdCaseSensitive; + public boolean isTableIdCaseInsensitive() { + return isTableIdCaseInsensitive; } public boolean isRemainingTablesCheckpointed() { @@ -156,7 +156,7 @@ public boolean equals(Object o) { } SnapshotPendingSplitsState that = (SnapshotPendingSplitsState) o; return Objects.equals(assignerStatus, that.assignerStatus) - && isTableIdCaseSensitive == that.isTableIdCaseSensitive + && isTableIdCaseInsensitive == that.isTableIdCaseInsensitive && isRemainingTablesCheckpointed == that.isRemainingTablesCheckpointed && Objects.equals(remainingTables, that.remainingTables) && Objects.equals(alreadyProcessedTables, that.alreadyProcessedTables) @@ -176,7 +176,7 @@ public int hashCode() { assignedSplits, splitFinishedOffsets, assignerStatus, - isTableIdCaseSensitive, + isTableIdCaseInsensitive, isRemainingTablesCheckpointed, splitFinishedCheckpointIds, chunkSplitterState); @@ -197,8 +197,8 @@ public String toString() { + splitFinishedOffsets + ", assignerStatus=" + assignerStatus - + ", isTableIdCaseSensitive=" - + isTableIdCaseSensitive + + ", isTableIdCaseInsensitive=" + + isTableIdCaseInsensitive + ", isRemainingTablesCheckpointed=" + isRemainingTablesCheckpointed + ", splitFinishedCheckpointIds=" diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/dialect/Db2Dialect.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/dialect/Db2Dialect.java index 11db3cae8f4..c8afc1a7692 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/dialect/Db2Dialect.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/dialect/Db2Dialect.java @@ -76,7 +76,7 @@ public Offset displayCurrentOffset(JdbcSourceConfig sourceConfig) { } @Override - public boolean isDataCollectionIdCaseSensitive(JdbcSourceConfig sourceConfig) { + public boolean isDataCollectionIdCaseInsensitive(JdbcSourceConfig sourceConfig) { return true; } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/dialect/MongoDBDialect.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/dialect/MongoDBDialect.java index d6ce3a9a13a..7c615c2b396 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/dialect/MongoDBDialect.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/dialect/MongoDBDialect.java @@ -165,7 +165,7 @@ public ChangeStreamOffset displayCurrentOffset(MongoDBSourceConfig sourceConfig) } @Override - public boolean isDataCollectionIdCaseSensitive(MongoDBSourceConfig sourceConfig) { + public boolean isDataCollectionIdCaseInsensitive(MongoDBSourceConfig sourceConfig) { // MongoDB's database names and collection names are case-sensitive. return true; } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlConnection.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlConnection.java index 801e27d6b89..4ad7ac85063 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlConnection.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlConnection.java @@ -496,7 +496,7 @@ public OptionalLong getEstimatedTableSize(TableId tableId) { return OptionalLong.empty(); } - public boolean isTableIdCaseSensitive() { + public boolean isTableIdCaseInsensitive() { return !"0" .equals( readMySqlSystemVariables() diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java index 7ca60be5bf2..139d8e5b048 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java @@ -104,7 +104,7 @@ public static BinaryLogClient createBinaryClient(Configuration dbzConfiguration) /** Creates a new {@link MySqlDatabaseSchema} to monitor the latest MySql database schemas. */ public static MySqlDatabaseSchema createMySqlDatabaseSchema( - MySqlConnectorConfig dbzMySqlConfig, boolean isTableIdCaseSensitive) { + MySqlConnectorConfig dbzMySqlConfig, boolean isTableIdCaseInsensitive) { TopicSelector topicSelector = MySqlTopicSelector.defaultSelector(dbzMySqlConfig); SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create(); MySqlValueConverters valueConverters = getValueConverters(dbzMySqlConfig); @@ -113,7 +113,7 @@ public static MySqlDatabaseSchema createMySqlDatabaseSchema( valueConverters, topicSelector, schemaNameAdjuster, - isTableIdCaseSensitive); + isTableIdCaseInsensitive); } /** Fetch current binlog offsets in MySql Server. */ @@ -208,7 +208,7 @@ public static List discoverCapturedTables( return capturedTableIds; } - public static boolean isTableIdCaseSensitive(JdbcConnection connection) { + public static boolean isTableIdCaseInsensitive(JdbcConnection connection) { return !"0" .equals( readMySqlSystemVariables(connection) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/context/StatefulTaskContext.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/context/StatefulTaskContext.java index 09a5fe8e85c..07dffd4e5de 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/context/StatefulTaskContext.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/context/StatefulTaskContext.java @@ -111,7 +111,7 @@ public StatefulTaskContext( public void configure(MySqlSplit mySqlSplit) { // initial stateful objects - final boolean tableIdCaseInsensitive = connection.isTableIdCaseSensitive(); + final boolean tableIdCaseInsensitive = connection.isTableIdCaseInsensitive(); this.topicSelector = MySqlTopicSelector.defaultSelector(connectorConfig); EmbeddedFlinkDatabaseHistory.registerHistory( sourceConfig diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/schema/MySqlSchema.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/schema/MySqlSchema.java index 5cc06625a08..e4e8ac0d2f2 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/schema/MySqlSchema.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/schema/MySqlSchema.java @@ -48,10 +48,10 @@ public class MySqlSchema implements AutoCloseable { private final MySqlDatabaseSchema databaseSchema; private final Map schemasByTableId; - public MySqlSchema(MySqlSourceConfig sourceConfig, boolean isTableIdCaseSensitive) { + public MySqlSchema(MySqlSourceConfig sourceConfig, boolean isTableIdCaseInsensitive) { this.connectorConfig = sourceConfig.getMySqlConnectorConfig(); this.databaseSchema = - DebeziumUtils.createMySqlDatabaseSchema(connectorConfig, isTableIdCaseSensitive); + DebeziumUtils.createMySqlDatabaseSchema(connectorConfig, isTableIdCaseInsensitive); this.schemasByTableId = new HashMap<>(); } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java index d68138c07e2..b9452ea3dd8 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java @@ -204,13 +204,13 @@ public SplitEnumerator createEnumerator( // In snapshot-only startup option, only split snapshots. if (sourceConfig.getStartupOptions().isSnapshotOnly()) { try (JdbcConnection jdbc = DebeziumUtils.openJdbcConnection(sourceConfig)) { - boolean isTableIdCaseSensitive = DebeziumUtils.isTableIdCaseSensitive(jdbc); + boolean isTableIdCaseInsensitive = DebeziumUtils.isTableIdCaseInsensitive(jdbc); splitAssigner = new MySqlSnapshotSplitAssigner( sourceConfig, enumContext.currentParallelism(), new ArrayList<>(), - isTableIdCaseSensitive, + isTableIdCaseInsensitive, enumContext); } catch (Exception e) { throw new FlinkRuntimeException( @@ -218,13 +218,13 @@ public SplitEnumerator createEnumerator( } } else if (!sourceConfig.getStartupOptions().isStreamOnly()) { try (JdbcConnection jdbc = DebeziumUtils.openJdbcConnection(sourceConfig)) { - boolean isTableIdCaseSensitive = DebeziumUtils.isTableIdCaseSensitive(jdbc); + boolean isTableIdCaseInsensitive = DebeziumUtils.isTableIdCaseInsensitive(jdbc); splitAssigner = new MySqlHybridSplitAssigner( sourceConfig, enumContext.currentParallelism(), new ArrayList<>(), - isTableIdCaseSensitive, + isTableIdCaseInsensitive, enumContext); } catch (Exception e) { throw new FlinkRuntimeException( diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssigner.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssigner.java index d66e6f3da1f..2814078513f 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssigner.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssigner.java @@ -60,7 +60,7 @@ public MySqlHybridSplitAssigner( MySqlSourceConfig sourceConfig, int currentParallelism, List remainingTables, - boolean isTableIdCaseSensitive, + boolean isTableIdCaseInsensitive, SplitEnumeratorContext enumeratorContext) { this( sourceConfig, @@ -68,7 +68,7 @@ public MySqlHybridSplitAssigner( sourceConfig, currentParallelism, remainingTables, - isTableIdCaseSensitive, + isTableIdCaseInsensitive, enumeratorContext), false, sourceConfig.getSplitMetaGroupSize()); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java index e295fb51fdb..9feaf99b972 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java @@ -88,7 +88,7 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner { private volatile Throwable uncaughtSplitterException; private AssignerStatus assignerStatus; private MySqlChunkSplitter chunkSplitter; - private boolean isTableIdCaseSensitive; + private boolean isTableIdCaseInsensitive; private ExecutorService executor; @Nullable private Long checkpointIdToFinish; @@ -97,7 +97,7 @@ public MySqlSnapshotSplitAssigner( MySqlSourceConfig sourceConfig, int currentParallelism, List remainingTables, - boolean isTableIdCaseSensitive, + boolean isTableIdCaseInsensitive, SplitEnumeratorContext enumeratorContext) { this( sourceConfig, @@ -109,7 +109,7 @@ public MySqlSnapshotSplitAssigner( new HashMap<>(), AssignerStatus.INITIAL_ASSIGNING, remainingTables, - isTableIdCaseSensitive, + isTableIdCaseInsensitive, true, ChunkSplitterState.NO_SPLITTING_TABLE_STATE, enumeratorContext); @@ -130,7 +130,7 @@ public MySqlSnapshotSplitAssigner( checkpoint.getSplitFinishedOffsets(), checkpoint.getSnapshotAssignerStatus(), checkpoint.getRemainingTables(), - checkpoint.isTableIdCaseSensitive(), + checkpoint.isTableIdCaseInsensitive(), checkpoint.isRemainingTablesCheckpointed(), checkpoint.getChunkSplitterState(), enumeratorContext); @@ -146,7 +146,7 @@ private MySqlSnapshotSplitAssigner( Map splitFinishedOffsets, AssignerStatus assignerStatus, List remainingTables, - boolean isTableIdCaseSensitive, + boolean isTableIdCaseInsensitive, boolean isRemainingTablesCheckpointed, ChunkSplitterState chunkSplitterState, SplitEnumeratorContext enumeratorContext) { @@ -170,9 +170,9 @@ private MySqlSnapshotSplitAssigner( this.assignerStatus = assignerStatus; this.remainingTables = new CopyOnWriteArrayList<>(remainingTables); this.isRemainingTablesCheckpointed = isRemainingTablesCheckpointed; - this.isTableIdCaseSensitive = isTableIdCaseSensitive; + this.isTableIdCaseInsensitive = isTableIdCaseInsensitive; this.chunkSplitter = - createChunkSplitter(sourceConfig, isTableIdCaseSensitive, chunkSplitterState); + createChunkSplitter(sourceConfig, isTableIdCaseInsensitive, chunkSplitterState); this.partition = new MySqlPartition(sourceConfig.getMySqlConnectorConfig().getLogicalName()); this.enumeratorContext = enumeratorContext; @@ -196,7 +196,7 @@ private void discoveryCaptureTables() { final List discoverTables = DebeziumUtils.discoverCapturedTables(jdbc, sourceConfig); this.remainingTables.addAll(discoverTables); - this.isTableIdCaseSensitive = DebeziumUtils.isTableIdCaseSensitive(jdbc); + this.isTableIdCaseInsensitive = DebeziumUtils.isTableIdCaseInsensitive(jdbc); } catch (Exception e) { throw new FlinkRuntimeException("Failed to discovery tables to capture", e); } @@ -213,7 +213,7 @@ else if (!isRemainingTablesCheckpointed DebeziumUtils.discoverCapturedTables(jdbc, sourceConfig); discoverTables.removeAll(alreadyProcessedTables); this.remainingTables.addAll(discoverTables); - this.isTableIdCaseSensitive = DebeziumUtils.isTableIdCaseSensitive(jdbc); + this.isTableIdCaseInsensitive = DebeziumUtils.isTableIdCaseInsensitive(jdbc); } catch (Exception e) { throw new FlinkRuntimeException( "Failed to discover remaining tables to capture", e); @@ -454,7 +454,7 @@ public SnapshotPendingSplitsState snapshotState(long checkpointId) { splitFinishedOffsets, assignerStatus, remainingTables, - isTableIdCaseSensitive, + isTableIdCaseInsensitive, true, chunkSplitter.snapshotState(checkpointId)); // we need a complete checkpoint before mark this assigner to be finished, to wait for @@ -612,9 +612,9 @@ private void checkSplitterErrors() { private static MySqlChunkSplitter createChunkSplitter( MySqlSourceConfig sourceConfig, - boolean isTableIdCaseSensitive, + boolean isTableIdCaseInsensitive, ChunkSplitterState chunkSplitterState) { - MySqlSchema mySqlSchema = new MySqlSchema(sourceConfig, isTableIdCaseSensitive); + MySqlSchema mySqlSchema = new MySqlSchema(sourceConfig, isTableIdCaseInsensitive); if (!ChunkSplitterState.NO_SPLITTING_TABLE_STATE.equals(chunkSplitterState)) { TableId tableId = chunkSplitterState.getCurrentSplittingTableId(); return new MySqlChunkSplitter( diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/PendingSplitsStateSerializer.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/PendingSplitsStateSerializer.java index bb3e2d2eda4..25799e02e7d 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/PendingSplitsStateSerializer.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/PendingSplitsStateSerializer.java @@ -161,7 +161,7 @@ private void serializeSnapshotPendingSplitsState( writeFinishedOffsets(state.getSplitFinishedOffsets(), out); out.writeInt(state.getSnapshotAssignerStatus().getStatusCode()); writeTableIds(state.getRemainingTables(), out); - out.writeBoolean(state.isTableIdCaseSensitive()); + out.writeBoolean(state.isTableIdCaseInsensitive()); MySqlSplitSerializer.writeTableSchemas(state.getTableSchemas(), out); boolean hasTableIsSplitting = @@ -265,7 +265,7 @@ private SnapshotPendingSplitsState deserializeSnapshotPendingSplitsState( } } List remainingTableIds = readTableIds(in); - boolean isTableIdCaseSensitive = in.readBoolean(); + boolean isTableIdCaseInsensitive = in.readBoolean(); final List remainingSchemalessSplits = new ArrayList<>(); final Map assignedSchemalessSnapshotSplits = new HashMap<>(); @@ -306,7 +306,7 @@ private SnapshotPendingSplitsState deserializeSnapshotPendingSplitsState( finishedOffsets, assignerStatus, remainingTableIds, - isTableIdCaseSensitive, + isTableIdCaseInsensitive, true, splittingTableId == null ? ChunkSplitterState.NO_SPLITTING_TABLE_STATE diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/SnapshotPendingSplitsState.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/SnapshotPendingSplitsState.java index 39aa71ad079..fb43e4fd213 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/SnapshotPendingSplitsState.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/SnapshotPendingSplitsState.java @@ -62,7 +62,7 @@ public class SnapshotPendingSplitsState extends PendingSplitsState { private final AssignerStatus assignerStatus; /** Whether the table identifier is case-sensitive. */ - private final boolean isTableIdCaseSensitive; + private final boolean isTableIdCaseInsensitive; /** Whether the remaining tables are keep when snapshot state. */ private final boolean isRemainingTablesCheckpointed; @@ -80,7 +80,7 @@ public SnapshotPendingSplitsState( Map splitFinishedOffsets, AssignerStatus assignerStatus, List remainingTables, - boolean isTableIdCaseSensitive, + boolean isTableIdCaseInsensitive, boolean isRemainingTablesCheckpointed, ChunkSplitterState chunkSplitterState) { this.alreadyProcessedTables = alreadyProcessedTables; @@ -89,7 +89,7 @@ public SnapshotPendingSplitsState( this.splitFinishedOffsets = splitFinishedOffsets; this.assignerStatus = assignerStatus; this.remainingTables = remainingTables; - this.isTableIdCaseSensitive = isTableIdCaseSensitive; + this.isTableIdCaseInsensitive = isTableIdCaseInsensitive; this.isRemainingTablesCheckpointed = isRemainingTablesCheckpointed; this.tableSchemas = tableSchemas; this.chunkSplitterState = chunkSplitterState; @@ -123,8 +123,8 @@ public List getRemainingTables() { return remainingTables; } - public boolean isTableIdCaseSensitive() { - return isTableIdCaseSensitive; + public boolean isTableIdCaseInsensitive() { + return isTableIdCaseInsensitive; } public boolean isRemainingTablesCheckpointed() { @@ -145,7 +145,7 @@ public boolean equals(Object o) { } SnapshotPendingSplitsState that = (SnapshotPendingSplitsState) o; return assignerStatus == that.assignerStatus - && isTableIdCaseSensitive == that.isTableIdCaseSensitive + && isTableIdCaseInsensitive == that.isTableIdCaseInsensitive && isRemainingTablesCheckpointed == that.isRemainingTablesCheckpointed && Objects.equals(remainingTables, that.remainingTables) && Objects.equals(alreadyProcessedTables, that.alreadyProcessedTables) @@ -164,7 +164,7 @@ public int hashCode() { assignedSplits, splitFinishedOffsets, assignerStatus, - isTableIdCaseSensitive, + isTableIdCaseInsensitive, isRemainingTablesCheckpointed, chunkSplitterState); } @@ -184,8 +184,8 @@ public String toString() { + splitFinishedOffsets + ", assignerStatus=" + assignerStatus - + ", isTableIdCaseSensitive=" - + isTableIdCaseSensitive + + ", isTableIdCaseInsensitive=" + + isTableIdCaseInsensitive + ", isRemainingTablesCheckpointed=" + isRemainingTablesCheckpointed + ", chunkSplitterState=" diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/TableDiscoveryUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/TableDiscoveryUtils.java index dedeab2dd5e..dc88dac0cf7 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/TableDiscoveryUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/TableDiscoveryUtils.java @@ -153,7 +153,7 @@ public static Map discoverSchemaForCapturedTables( // fetch table schemas try (MySqlSchema mySqlSchema = - new MySqlSchema(sourceConfig, jdbc.isTableIdCaseSensitive())) { + new MySqlSchema(sourceConfig, jdbc.isTableIdCaseInsensitive())) { Map tableSchemas = new HashMap<>(); for (TableId tableId : capturedTableIds) { TableChange tableSchema = mySqlSchema.getTableSchema(partition, jdbc, tableId); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/OracleDialect.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/OracleDialect.java index 8b4c0aa0f6d..af82ef9fd34 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/OracleDialect.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/OracleDialect.java @@ -73,7 +73,7 @@ public Offset displayCurrentOffset(JdbcSourceConfig sourceConfig) { } @Override - public boolean isDataCollectionIdCaseSensitive(JdbcSourceConfig sourceConfig) { + public boolean isDataCollectionIdCaseInsensitive(JdbcSourceConfig sourceConfig) { try (JdbcConnection jdbcConnection = openJdbcConnection(sourceConfig)) { OracleConnection oracleConnection = (OracleConnection) jdbcConnection; return oracleConnection.getOracleVersion().getMajor() == 11; diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialect.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialect.java index 000cfa9c4da..71f5c35399a 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialect.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialect.java @@ -153,7 +153,7 @@ public Offset displayCommittedOffset(JdbcSourceConfig sourceConfig) { } @Override - public boolean isDataCollectionIdCaseSensitive(JdbcSourceConfig sourceConfig) { + public boolean isDataCollectionIdCaseInsensitive(JdbcSourceConfig sourceConfig) { // from Postgres docs: // // SQL is case insensitive about key words and identifiers, diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java index 5ce690f0a3e..b62e09b98fa 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java @@ -321,14 +321,14 @@ public PostgresSourceEnumerator createEnumerator( try { final List remainingTables = dataSourceDialect.discoverDataCollections(sourceConfig); - boolean isTableIdCaseSensitive = - dataSourceDialect.isDataCollectionIdCaseSensitive(sourceConfig); + boolean isTableIdCaseInsensitive = + dataSourceDialect.isDataCollectionIdCaseInsensitive(sourceConfig); splitAssigner = new HybridSplitAssigner<>( sourceConfig, enumContext.currentParallelism(), remainingTables, - isTableIdCaseSensitive, + isTableIdCaseInsensitive, dataSourceDialect, offsetFactory, enumContext); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTaskTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTaskTest.java index ee3dcdcb06c..82e21315565 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTaskTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTaskTest.java @@ -381,7 +381,7 @@ private List getSnapshotSplits( sourceConfig, DEFAULT_PARALLELISM, discoverTables, - sourceDialect.isDataCollectionIdCaseSensitive(sourceConfig), + sourceDialect.isDataCollectionIdCaseInsensitive(sourceConfig), sourceDialect, offsetFactory); snapshotSplitAssigner.initEnumeratorMetrics( diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/dialect/SqlServerDialect.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/dialect/SqlServerDialect.java index 01f45bb6514..b67657ce0f2 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/dialect/SqlServerDialect.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/dialect/SqlServerDialect.java @@ -75,7 +75,7 @@ public Offset displayCurrentOffset(JdbcSourceConfig sourceConfig) { } @Override - public boolean isDataCollectionIdCaseSensitive(JdbcSourceConfig sourceConfig) { + public boolean isDataCollectionIdCaseInsensitive(JdbcSourceConfig sourceConfig) { return true; } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/reader/fetch/SqlServerScanFetchTaskTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/reader/fetch/SqlServerScanFetchTaskTest.java index ae6fe0f3b9e..bfad2dd5810 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/reader/fetch/SqlServerScanFetchTaskTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/reader/fetch/SqlServerScanFetchTaskTest.java @@ -330,7 +330,7 @@ private List getSnapshotSplits( sourceConfig, DEFAULT_PARALLELISM, discoverTables, - sourceDialect.isDataCollectionIdCaseSensitive(sourceConfig), + sourceDialect.isDataCollectionIdCaseInsensitive(sourceConfig), sourceDialect, offsetFactory); snapshotSplitAssigner.initEnumeratorMetrics(