Skip to content

[FLINK-37855-][cdc-connector-mysql] Reduce the time cost in MySqlSchemaUtils#listTables #4032

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,15 @@

import javax.annotation.Nullable;

import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

import static org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils.createMySqlConnection;
Expand All @@ -45,6 +50,10 @@
/** Utilities for converting from debezium {@link Table} types to {@link Schema}. */
public class MySqlSchemaUtils {

private static final String[] TABLE_QUERY = {"TABLE"};
private static final Set<String> SYSTEM_DB_SET =
new HashSet<>(
Arrays.asList("information_schema", "mysql", "sys", "performance_schema"));
private static final Logger LOG = LoggerFactory.getLogger(MySqlSchemaUtils.class);

public static List<String> listDatabases(MySqlSourceConfig sourceConfig) {
Expand All @@ -58,16 +67,23 @@ public static List<String> listDatabases(MySqlSourceConfig sourceConfig) {
public static List<TableId> listTables(
MySqlSourceConfig sourceConfig, @Nullable String dbName) {
try (MySqlConnection jdbc = createMySqlConnection(sourceConfig)) {
List<String> databases =
dbName != null ? Collections.singletonList(dbName) : listDatabases(jdbc);

List<TableId> tableIds = new ArrayList<>();
for (String database : databases) {
tableIds.addAll(listTables(jdbc, database));
try (Connection connection = jdbc.connection()) {
DatabaseMetaData metaData = connection.getMetaData();
try (ResultSet resultSet = metaData.getTables(dbName, null, "%", TABLE_QUERY)) {
while (resultSet.next()) {
String database = resultSet.getString("TABLE_CAT");
if (SYSTEM_DB_SET.contains(database)) {
continue;
}
String tableName = resultSet.getString("TABLE_NAME");
tableIds.add(TableId.tableId(database, tableName));
}
}
}
return tableIds;
} catch (SQLException e) {
throw new RuntimeException("Error to list databases: " + e.getMessage(), e);
throw new RuntimeException("Error to list tables: " + dbName, e);
}
}

Expand All @@ -86,16 +102,22 @@ public static List<String> listDatabases(JdbcConnection jdbc) throws SQLExceptio
// -------------------
// Get the list of databases ...
LOG.info("Read list of available databases");
final List<String> databaseNames = new ArrayList<>();
jdbc.query(
"SHOW DATABASES WHERE `database` NOT IN ('information_schema', 'mysql', 'performance_schema', 'sys')",
rs -> {
while (rs.next()) {
databaseNames.add(rs.getString(1));
try (Connection connection = jdbc.connection()) {
DatabaseMetaData metaData = connection.getMetaData();
try (ResultSet resultSet = metaData.getCatalogs()) {
List<String> databaseNames = new ArrayList<>();
while (resultSet.next()) {
String dbName = resultSet.getString("TABLE_CAT");
if (SYSTEM_DB_SET.contains(dbName)) {
continue;
}
});
LOG.info("\t list of available databases are: {}", databaseNames);
return databaseNames;
databaseNames.add(dbName);
}

LOG.info("\t list of available databases are:{}", databaseNames);
return databaseNames;
}
}
}

public static List<TableId> listTables(JdbcConnection jdbc, String dbName) throws SQLException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
Expand All @@ -40,6 +43,7 @@
/** Utilities to discovery matched tables. */
public class TableDiscoveryUtils {

private static final String[] TABLE_QUERY = {"TABLE"};
private static final Logger LOG = LoggerFactory.getLogger(TableDiscoveryUtils.class);

public static List<TableId> listTables(
Expand All @@ -50,54 +54,26 @@ public static List<TableId> listTables(
// READ DATABASE NAMES
// -------------------
// Get the list of databases ...
LOG.info("Read list of available databases");
final List<String> databaseNames = new ArrayList<>();

jdbc.query(
"SHOW DATABASES",
rs -> {
while (rs.next()) {
String databaseName = rs.getString(1);
if (databaseFilter.test(databaseName)) {
databaseNames.add(databaseName);
}
try (Connection connection = jdbc.connection()) {
DatabaseMetaData databaseMetaData = connection.getMetaData();
try (ResultSet tableResult = databaseMetaData.getTables(null, null, "%", TABLE_QUERY)) {
while (tableResult.next()) {
String dbName = tableResult.getString("TABLE_CAT");
if (!databaseFilter.test(dbName)) {
continue;
}
String tableName = tableResult.getString("TABLE_NAME");
TableId tableId = new TableId(dbName, null, tableName);
if (tableFilter.test(tableId)) {
capturedTableIds.add(tableId);
LOG.info("\t including table '{}' for further processing", tableId);
} else {
LOG.info("\t '{}' is filtered out of table capturing", tableId);
}
});
LOG.info("\t list of available databases is: {}", databaseNames);
}

// ----------------
// READ TABLE NAMES
// ----------------
// Get the list of table IDs for each database. We can't use a prepared statement with
// MySQL, so we have to build the SQL statement each time. Although in other cases this
// might lead to SQL injection, in our case we are reading the database names from the
// database and not taking them from the user ...
LOG.info("Read list of available tables in each database");
for (String dbName : databaseNames) {
try {
jdbc.query(
"SHOW FULL TABLES IN "
+ StatementUtils.quote(dbName)
+ " where Table_Type = 'BASE TABLE'",
rs -> {
while (rs.next()) {
TableId tableId = new TableId(dbName, null, rs.getString(1));
if (tableFilter.test(tableId)) {
capturedTableIds.add(tableId);
LOG.info(
"\t including table '{}' for further processing",
tableId);
} else {
LOG.info("\t '{}' is filtered out of table capturing", tableId);
}
}
});
} catch (SQLException e) {
// We were unable to execute the query or process the results, so skip this ...
LOG.warn(
"\t skipping database '{}' due to error reading tables: {}",
dbName,
e.getMessage());
} catch (Exception e) {
LOG.warn("Failed to get list of tables: {}", e.getMessage());
}
}
return capturedTableIds;
Expand Down
Loading