Skip to content

[FLINK-37934][cdc-source-connectors] Get the case-sensitive variable names and method names of table id are not standardized, and need to be adjusted #4041

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public static Schema getTableSchema(
MySqlConnection jdbc) {
// fetch table schemas
try (MySqlSchema mySqlSchema =
new MySqlSchema(sourceConfig, jdbc.isTableIdCaseSensitive())) {
new MySqlSchema(sourceConfig, jdbc.isTableIdCaseInsensitive())) {
TableChanges.TableChange tableSchema =
mySqlSchema.getTableSchema(partition, jdbc, toDbzTableId(tableId));
return toSchema(tableSchema.getTable(), sourceConfig.isTreatTinyInt1AsBoolean());
Expand Down Expand Up @@ -160,7 +160,7 @@ public static io.debezium.relational.TableId toDbzTableId(TableId tableId) {

public static boolean isTableIdCaseInsensitive(MySqlSourceConfig sourceConfig) {
try (MySqlConnection jdbc = createMySqlConnection(sourceConfig)) {
return jdbc.isTableIdCaseSensitive();
return jdbc.isTableIdCaseInsensitive();
} catch (Exception e) {
throw new RuntimeException("Error to get table id caseSensitive: " + e.getMessage(), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ default Offset displayCommittedOffset(C sourceConfig) {
}

/** Check if the CollectionId is case-sensitive or not. */
boolean isDataCollectionIdCaseSensitive(C sourceConfig);
boolean isDataCollectionIdCaseInsensitive(C sourceConfig);

/** Returns the {@link ChunkSplitter} which used to split collection to splits. */
@Deprecated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,14 +151,14 @@ public SplitEnumerator<SourceSplitBase, PendingSplitsState> createEnumerator(
try {
final List<TableId> remainingTables =
dataSourceDialect.discoverDataCollections(sourceConfig);
boolean isTableIdCaseSensitive =
dataSourceDialect.isDataCollectionIdCaseSensitive(sourceConfig);
boolean isTableIdCaseInsensitive =
dataSourceDialect.isDataCollectionIdCaseInsensitive(sourceConfig);
splitAssigner =
new HybridSplitAssigner<>(
sourceConfig,
enumContext.currentParallelism(),
remainingTables,
isTableIdCaseSensitive,
isTableIdCaseInsensitive,
dataSourceDialect,
offsetFactory,
enumContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public HybridSplitAssigner(
C sourceConfig,
int currentParallelism,
List<TableId> remainingTables,
boolean isTableIdCaseSensitive,
boolean isTableIdCaseInsensitive,
DataSourceDialect<C> dialect,
OffsetFactory offsetFactory,
SplitEnumeratorContext<? extends SourceSplit> enumeratorContext) {
Expand All @@ -81,7 +81,7 @@ public HybridSplitAssigner(
sourceConfig,
currentParallelism,
remainingTables,
isTableIdCaseSensitive,
isTableIdCaseInsensitive,
dialect,
offsetFactory),
false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public class SnapshotSplitAssigner<C extends SourceConfig> implements SplitAssig
private final boolean isRemainingTablesCheckpointed;

private ChunkSplitter chunkSplitter;
private boolean isTableIdCaseSensitive;
private boolean isTableIdCaseInsensitive;

@Nullable private Long checkpointIdToFinish;
private final DataSourceDialect<C> dialect;
Expand All @@ -102,7 +102,7 @@ public SnapshotSplitAssigner(
C sourceConfig,
int currentParallelism,
List<TableId> remainingTables,
boolean isTableIdCaseSensitive,
boolean isTableIdCaseInsensitive,
DataSourceDialect<C> dialect,
OffsetFactory offsetFactory) {
this(
Expand All @@ -115,7 +115,7 @@ public SnapshotSplitAssigner(
new HashMap<>(),
INITIAL_ASSIGNING,
remainingTables,
isTableIdCaseSensitive,
isTableIdCaseInsensitive,
true,
dialect,
offsetFactory,
Expand All @@ -139,7 +139,7 @@ public SnapshotSplitAssigner(
checkpoint.getSplitFinishedOffsets(),
checkpoint.getSnapshotAssignerStatus(),
checkpoint.getRemainingTables(),
checkpoint.isTableIdCaseSensitive(),
checkpoint.isTableIdCaseInsensitive(),
checkpoint.isRemainingTablesCheckpointed(),
dialect,
offsetFactory,
Expand All @@ -157,7 +157,7 @@ private SnapshotSplitAssigner(
Map<String, Offset> splitFinishedOffsets,
AssignerStatus assignerStatus,
List<TableId> remainingTables,
boolean isTableIdCaseSensitive,
boolean isTableIdCaseInsensitive,
boolean isRemainingTablesCheckpointed,
DataSourceDialect<C> dialect,
OffsetFactory offsetFactory,
Expand All @@ -183,7 +183,7 @@ private SnapshotSplitAssigner(
this.assignerStatus = assignerStatus;
this.remainingTables = new CopyOnWriteArrayList<>(remainingTables);
this.isRemainingTablesCheckpointed = isRemainingTablesCheckpointed;
this.isTableIdCaseSensitive = isTableIdCaseSensitive;
this.isTableIdCaseInsensitive = isTableIdCaseInsensitive;
this.dialect = dialect;
this.offsetFactory = offsetFactory;
this.splitFinishedCheckpointIds = splitFinishedCheckpointIds;
Expand Down Expand Up @@ -524,7 +524,7 @@ public SnapshotPendingSplitsState snapshotState(long checkpointId) {
splitFinishedOffsets,
assignerStatus,
remainingTables,
isTableIdCaseSensitive,
isTableIdCaseInsensitive,
true,
splitFinishedCheckpointIds,
chunkSplitter.snapshotState(checkpointId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ private void serializeSnapshotPendingSplitsState(
writeFinishedOffsets(state.getSplitFinishedOffsets(), out);
out.writeInt(state.getSnapshotAssignerStatus().getStatusCode());
writeTableIds(state.getRemainingTables(), out);
out.writeBoolean(state.isTableIdCaseSensitive());
out.writeBoolean(state.isTableIdCaseInsensitive());
writeTableSchemas(state.getTableSchemas(), out);

writeSplitFinishedCheckpointIds(state.getSplitFinishedCheckpointIds(), out);
Expand Down Expand Up @@ -283,7 +283,7 @@ private SnapshotPendingSplitsState deserializeSnapshotPendingSplitsState(
}
}
List<TableId> remainingTableIds = readTableIds(version, in);
boolean isTableIdCaseSensitive = in.readBoolean();
boolean isTableIdCaseInsensitive = in.readBoolean();
final List<SchemalessSnapshotSplit> remainingSchemalessSplits = new ArrayList<>();
final Map<String, SchemalessSnapshotSplit> assignedSchemalessSnapshotSplits =
new HashMap<>();
Expand Down Expand Up @@ -333,7 +333,7 @@ private SnapshotPendingSplitsState deserializeSnapshotPendingSplitsState(
finishedOffsets,
assignerStatus,
remainingTableIds,
isTableIdCaseSensitive,
isTableIdCaseInsensitive,
true,
splitFinishedCheckpointIds,
chunkSplitterState);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public class SnapshotPendingSplitsState extends PendingSplitsState {
private final AssignerStatus assignerStatus;

/** Whether the table identifier is case sensitive. */
private final boolean isTableIdCaseSensitive;
private final boolean isTableIdCaseInsensitive;

/** Whether the remaining tables are keep when snapshot state. */
private final boolean isRemainingTablesCheckpointed;
Expand All @@ -85,7 +85,7 @@ public SnapshotPendingSplitsState(
Map<String, Offset> splitFinishedOffsets,
AssignerStatus assignerStatus,
List<TableId> remainingTables,
boolean isTableIdCaseSensitive,
boolean isTableIdCaseInsensitive,
boolean isRemainingTablesCheckpointed,
Map<String, Long> splitFinishedCheckpointIds,
ChunkSplitterState chunkSplitterState) {
Expand All @@ -95,7 +95,7 @@ public SnapshotPendingSplitsState(
this.splitFinishedOffsets = splitFinishedOffsets;
this.assignerStatus = assignerStatus;
this.remainingTables = remainingTables;
this.isTableIdCaseSensitive = isTableIdCaseSensitive;
this.isTableIdCaseInsensitive = isTableIdCaseInsensitive;
this.isRemainingTablesCheckpointed = isRemainingTablesCheckpointed;
this.tableSchemas = tableSchemas;
this.chunkSplitterState = chunkSplitterState;
Expand Down Expand Up @@ -134,8 +134,8 @@ public List<TableId> getRemainingTables() {
return remainingTables;
}

public boolean isTableIdCaseSensitive() {
return isTableIdCaseSensitive;
public boolean isTableIdCaseInsensitive() {
return isTableIdCaseInsensitive;
}

public boolean isRemainingTablesCheckpointed() {
Expand All @@ -156,7 +156,7 @@ public boolean equals(Object o) {
}
SnapshotPendingSplitsState that = (SnapshotPendingSplitsState) o;
return Objects.equals(assignerStatus, that.assignerStatus)
&& isTableIdCaseSensitive == that.isTableIdCaseSensitive
&& isTableIdCaseInsensitive == that.isTableIdCaseInsensitive
&& isRemainingTablesCheckpointed == that.isRemainingTablesCheckpointed
&& Objects.equals(remainingTables, that.remainingTables)
&& Objects.equals(alreadyProcessedTables, that.alreadyProcessedTables)
Expand All @@ -176,7 +176,7 @@ public int hashCode() {
assignedSplits,
splitFinishedOffsets,
assignerStatus,
isTableIdCaseSensitive,
isTableIdCaseInsensitive,
isRemainingTablesCheckpointed,
splitFinishedCheckpointIds,
chunkSplitterState);
Expand All @@ -197,8 +197,8 @@ public String toString() {
+ splitFinishedOffsets
+ ", assignerStatus="
+ assignerStatus
+ ", isTableIdCaseSensitive="
+ isTableIdCaseSensitive
+ ", isTableIdCaseInsensitive="
+ isTableIdCaseInsensitive
+ ", isRemainingTablesCheckpointed="
+ isRemainingTablesCheckpointed
+ ", splitFinishedCheckpointIds="
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public Offset displayCurrentOffset(JdbcSourceConfig sourceConfig) {
}

@Override
public boolean isDataCollectionIdCaseSensitive(JdbcSourceConfig sourceConfig) {
public boolean isDataCollectionIdCaseInsensitive(JdbcSourceConfig sourceConfig) {
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ public ChangeStreamOffset displayCurrentOffset(MongoDBSourceConfig sourceConfig)
}

@Override
public boolean isDataCollectionIdCaseSensitive(MongoDBSourceConfig sourceConfig) {
public boolean isDataCollectionIdCaseInsensitive(MongoDBSourceConfig sourceConfig) {
// MongoDB's database names and collection names are case-sensitive.
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,7 @@ public OptionalLong getEstimatedTableSize(TableId tableId) {
return OptionalLong.empty();
}

public boolean isTableIdCaseSensitive() {
public boolean isTableIdCaseInsensitive() {
return !"0"
.equals(
readMySqlSystemVariables()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public static BinaryLogClient createBinaryClient(Configuration dbzConfiguration)

/** Creates a new {@link MySqlDatabaseSchema} to monitor the latest MySql database schemas. */
public static MySqlDatabaseSchema createMySqlDatabaseSchema(
MySqlConnectorConfig dbzMySqlConfig, boolean isTableIdCaseSensitive) {
MySqlConnectorConfig dbzMySqlConfig, boolean isTableIdCaseInsensitive) {
TopicSelector<TableId> topicSelector = MySqlTopicSelector.defaultSelector(dbzMySqlConfig);
SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create();
MySqlValueConverters valueConverters = getValueConverters(dbzMySqlConfig);
Expand All @@ -113,7 +113,7 @@ public static MySqlDatabaseSchema createMySqlDatabaseSchema(
valueConverters,
topicSelector,
schemaNameAdjuster,
isTableIdCaseSensitive);
isTableIdCaseInsensitive);
}

/** Fetch current binlog offsets in MySql Server. */
Expand Down Expand Up @@ -208,7 +208,7 @@ public static List<TableId> discoverCapturedTables(
return capturedTableIds;
}

public static boolean isTableIdCaseSensitive(JdbcConnection connection) {
public static boolean isTableIdCaseInsensitive(JdbcConnection connection) {
return !"0"
.equals(
readMySqlSystemVariables(connection)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public StatefulTaskContext(

public void configure(MySqlSplit mySqlSplit) {
// initial stateful objects
final boolean tableIdCaseInsensitive = connection.isTableIdCaseSensitive();
final boolean tableIdCaseInsensitive = connection.isTableIdCaseInsensitive();
this.topicSelector = MySqlTopicSelector.defaultSelector(connectorConfig);
EmbeddedFlinkDatabaseHistory.registerHistory(
sourceConfig
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@ public class MySqlSchema implements AutoCloseable {
private final MySqlDatabaseSchema databaseSchema;
private final Map<TableId, TableChange> schemasByTableId;

public MySqlSchema(MySqlSourceConfig sourceConfig, boolean isTableIdCaseSensitive) {
public MySqlSchema(MySqlSourceConfig sourceConfig, boolean isTableIdCaseInsensitive) {
this.connectorConfig = sourceConfig.getMySqlConnectorConfig();
this.databaseSchema =
DebeziumUtils.createMySqlDatabaseSchema(connectorConfig, isTableIdCaseSensitive);
DebeziumUtils.createMySqlDatabaseSchema(connectorConfig, isTableIdCaseInsensitive);
this.schemasByTableId = new HashMap<>();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,27 +204,27 @@ public SplitEnumerator<MySqlSplit, PendingSplitsState> createEnumerator(
// In snapshot-only startup option, only split snapshots.
if (sourceConfig.getStartupOptions().isSnapshotOnly()) {
try (JdbcConnection jdbc = DebeziumUtils.openJdbcConnection(sourceConfig)) {
boolean isTableIdCaseSensitive = DebeziumUtils.isTableIdCaseSensitive(jdbc);
boolean isTableIdCaseInsensitive = DebeziumUtils.isTableIdCaseInsensitive(jdbc);
splitAssigner =
new MySqlSnapshotSplitAssigner(
sourceConfig,
enumContext.currentParallelism(),
new ArrayList<>(),
isTableIdCaseSensitive,
isTableIdCaseInsensitive,
enumContext);
} catch (Exception e) {
throw new FlinkRuntimeException(
"Failed to discover captured tables for enumerator", e);
}
} else if (!sourceConfig.getStartupOptions().isStreamOnly()) {
try (JdbcConnection jdbc = DebeziumUtils.openJdbcConnection(sourceConfig)) {
boolean isTableIdCaseSensitive = DebeziumUtils.isTableIdCaseSensitive(jdbc);
boolean isTableIdCaseInsensitive = DebeziumUtils.isTableIdCaseInsensitive(jdbc);
splitAssigner =
new MySqlHybridSplitAssigner(
sourceConfig,
enumContext.currentParallelism(),
new ArrayList<>(),
isTableIdCaseSensitive,
isTableIdCaseInsensitive,
enumContext);
} catch (Exception e) {
throw new FlinkRuntimeException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,15 @@ public MySqlHybridSplitAssigner(
MySqlSourceConfig sourceConfig,
int currentParallelism,
List<TableId> remainingTables,
boolean isTableIdCaseSensitive,
boolean isTableIdCaseInsensitive,
SplitEnumeratorContext<MySqlSplit> enumeratorContext) {
this(
sourceConfig,
new MySqlSnapshotSplitAssigner(
sourceConfig,
currentParallelism,
remainingTables,
isTableIdCaseSensitive,
isTableIdCaseInsensitive,
enumeratorContext),
false,
sourceConfig.getSplitMetaGroupSize());
Expand Down
Loading