Skip to content

Commit 6aedb12

Browse files
committed
MySqlDatabaseSchema include gh-ost ddl in HistorizedRelationalDatabaseSchema
1 parent 224a075 commit 6aedb12

File tree

5 files changed

+931
-4
lines changed

5 files changed

+931
-4
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,339 @@
1+
/*
2+
* Copyright Debezium Authors.
3+
*
4+
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
5+
*/
6+
package io.debezium.relational.ddl;
7+
8+
import io.debezium.annotation.NotThreadSafe;
9+
import io.debezium.relational.RelationalTableFilters;
10+
import io.debezium.relational.TableId;
11+
import io.debezium.relational.Tables.TableFilter;
12+
import org.slf4j.Logger;
13+
import org.slf4j.LoggerFactory;
14+
15+
import java.util.ArrayList;
16+
import java.util.HashSet;
17+
import java.util.List;
18+
import java.util.Set;
19+
import java.util.function.Predicate;
20+
import java.util.regex.Pattern;
21+
22+
/**
23+
* Copied from Debezium project. A {@link DdlParserListener} that accumulates changes, allowing them
24+
* to be consumed in the same order by database.
25+
*
26+
* @author Randall Hauch
27+
*/
28+
@NotThreadSafe
29+
public class DdlChanges implements DdlParserListener {
30+
31+
private static final Logger LOGGER = LoggerFactory.getLogger(DdlChanges.class);
32+
private final String terminator;
33+
private final List<Event> events = new ArrayList<>();
34+
private final Set<String> databaseNames = new HashSet<>();
35+
private static final Pattern GHOST_TABLE_PATTERN = Pattern.compile("^_(.*)_(del|gho|ghc)$");
36+
37+
/** Create a new changes object with ';' as the terminator token. */
38+
public DdlChanges() {
39+
this(null);
40+
}
41+
42+
/**
43+
* Create a new changes object with the designated terminator token.
44+
*
45+
* @param terminator the token used to terminate each statement; may be null
46+
*/
47+
public DdlChanges(String terminator) {
48+
this.terminator = terminator != null ? terminator : ";";
49+
}
50+
51+
/**
52+
* Clear all accumulated changes.
53+
*
54+
* @return this object for method chaining; never null
55+
*/
56+
public DdlChanges reset() {
57+
events.clear();
58+
databaseNames.clear();
59+
return this;
60+
}
61+
62+
@Override
63+
public void handle(Event event) {
64+
events.add(event);
65+
databaseNames.add(getDatabase(event));
66+
}
67+
68+
/**
69+
* Consume the events in the same order they were {@link
70+
* #handle(io.debezium.relational.ddl.DdlParserListener.Event) recorded}, but grouped by
71+
* database name. Multiple sequential statements that were applied to the same database are
72+
* grouped together.
73+
*
74+
* @param consumer the consumer
75+
*/
76+
public void groupStatementStringsByDatabase(DatabaseStatementStringConsumer consumer) {
77+
groupEventsByDatabase(
78+
(DatabaseEventConsumer)
79+
(dbName, eventList) -> {
80+
final StringBuilder statements = new StringBuilder();
81+
final Set<TableId> tables = new HashSet<>();
82+
eventList.forEach(
83+
event -> {
84+
statements.append(event.statement());
85+
statements.append(terminator);
86+
addTable(tables, event);
87+
});
88+
consumer.consume(dbName, tables, statements.toString());
89+
});
90+
}
91+
92+
private void addTable(final Set<TableId> tables, Event event) {
93+
if (event instanceof TableEvent) {
94+
tables.add(((TableEvent) event).tableId());
95+
}
96+
}
97+
98+
/**
99+
* Consume the events in the same order they were {@link
100+
* #handle(io.debezium.relational.ddl.DdlParserListener.Event) recorded}, but grouped by
101+
* database name. Multiple sequential statements that were applied to the same database are
102+
* grouped together.
103+
*
104+
* @param consumer the consumer
105+
*/
106+
public void groupStatementsByDatabase(DatabaseStatementConsumer consumer) {
107+
groupEventsByDatabase(
108+
(DatabaseEventConsumer)
109+
(dbName, eventList) -> {
110+
List<String> statements = new ArrayList<>();
111+
final Set<TableId> tables = new HashSet<>();
112+
eventList.forEach(
113+
event -> {
114+
statements.add(event.statement());
115+
addTable(tables, event);
116+
});
117+
consumer.consume(dbName, tables, statements);
118+
});
119+
}
120+
121+
/**
122+
* Consume the events in the same order they were {@link
123+
* #handle(io.debezium.relational.ddl.DdlParserListener.Event) recorded}, but grouped by
124+
* database name. Multiple sequential statements that were applied to the same database are
125+
* grouped together.
126+
*
127+
* @param consumer the consumer
128+
*/
129+
public void groupEventsByDatabase(DatabaseEventConsumer consumer) {
130+
if (isEmpty()) {
131+
return;
132+
}
133+
if (databaseNames.size() <= 1) {
134+
consumer.consume(databaseNames.iterator().next(), events);
135+
return;
136+
}
137+
List<Event> dbEvents = new ArrayList<>();
138+
String currentDatabase = null;
139+
for (Event event : events) {
140+
String dbName = getDatabase(event);
141+
if (currentDatabase == null || dbName.equals(currentDatabase)) {
142+
currentDatabase = dbName;
143+
// Accumulate the statement ...
144+
dbEvents.add(event);
145+
} else {
146+
// Submit the statements ...
147+
consumer.consume(currentDatabase, dbEvents);
148+
}
149+
}
150+
}
151+
152+
/**
153+
* Consume the events in the same order they were {@link
154+
* #handle(io.debezium.relational.ddl.DdlParserListener.Event) recorded}, but grouped by
155+
* database name. Multiple sequential statements that were applied to the same database are
156+
* grouped together.
157+
*
158+
* @param consumer the consumer
159+
*/
160+
public void getEventsByDatabase(DatabaseEventConsumer consumer) {
161+
if (isEmpty()) {
162+
return;
163+
}
164+
if (databaseNames.size() <= 1) {
165+
consumer.consume(databaseNames.iterator().next(), events);
166+
return;
167+
}
168+
List<Event> dbEvents = new ArrayList<>();
169+
String currentDatabase = null;
170+
for (Event event : events) {
171+
String dbName = getDatabase(event);
172+
if (currentDatabase == null || dbName.equals(currentDatabase)) {
173+
currentDatabase = dbName;
174+
// Accumulate the statement ...
175+
dbEvents.add(event);
176+
} else {
177+
// Submit the statements ...
178+
consumer.consume(currentDatabase, dbEvents);
179+
dbEvents = new ArrayList<>();
180+
currentDatabase = dbName;
181+
// Accumulate the statement ...
182+
dbEvents.add(event);
183+
}
184+
}
185+
if (!dbEvents.isEmpty()) {
186+
consumer.consume(currentDatabase, dbEvents);
187+
}
188+
}
189+
190+
protected String getDatabase(Event event) {
191+
switch (event.type()) {
192+
case CREATE_TABLE:
193+
case ALTER_TABLE:
194+
case DROP_TABLE:
195+
case TRUNCATE_TABLE:
196+
TableEvent tableEvent = (TableEvent) event;
197+
return tableEvent.tableId().catalog();
198+
case CREATE_INDEX:
199+
case DROP_INDEX:
200+
TableIndexEvent tableIndexEvent = (TableIndexEvent) event;
201+
return tableIndexEvent.tableId().catalog();
202+
case CREATE_DATABASE:
203+
case ALTER_DATABASE:
204+
case DROP_DATABASE:
205+
case USE_DATABASE:
206+
DatabaseEvent dbEvent = (DatabaseEvent) event;
207+
return dbEvent.databaseName();
208+
case SET_VARIABLE:
209+
SetVariableEvent varEvent = (SetVariableEvent) event;
210+
return varEvent.databaseName().orElse("");
211+
}
212+
assert false : "Should never happen";
213+
return null;
214+
}
215+
216+
public boolean isEmpty() {
217+
return events.isEmpty();
218+
}
219+
220+
public boolean applyToMoreDatabasesThan(String name) {
221+
return databaseNames.contains(name) ? databaseNames.size() > 1 : databaseNames.size() > 0;
222+
}
223+
224+
@Override
225+
public String toString() {
226+
return events.toString();
227+
}
228+
229+
public static interface DatabaseEventConsumer {
230+
void consume(String databaseName, List<Event> events);
231+
}
232+
233+
public static interface DatabaseStatementConsumer {
234+
void consume(String databaseName, Set<TableId> tableList, List<String> ddlStatements);
235+
}
236+
237+
public static interface DatabaseStatementStringConsumer {
238+
void consume(String databaseName, Set<TableId> tableList, String ddlStatements);
239+
}
240+
241+
/**
242+
* @return true if any event stored is one of
243+
* <ul>
244+
* <li>database-wide events and affects included/excluded database
245+
* <li>table related events and the table is included
246+
* <li>events that set a variable and either affects included database or is a system-wide
247+
* variable
248+
* <ul>
249+
*/
250+
@Deprecated
251+
public boolean anyMatch(Predicate<String> databaseFilter, Predicate<TableId> tableFilter) {
252+
return events.stream()
253+
.anyMatch(
254+
event ->
255+
(event instanceof DatabaseEvent)
256+
&& databaseFilter.test(
257+
((DatabaseEvent) event).databaseName())
258+
|| (event instanceof TableEvent)
259+
&& tableFilter.test(((TableEvent) event).tableId())
260+
|| (event instanceof SetVariableEvent)
261+
&& (!((SetVariableEvent) event)
262+
.databaseName()
263+
.isPresent()
264+
|| databaseFilter.test(
265+
((SetVariableEvent) event)
266+
.databaseName()
267+
.get())));
268+
}
269+
270+
/**
271+
* @return true if any event stored is one of
272+
* <ul>
273+
* <li>database-wide events and affects included/excluded database
274+
* <li>table related events and the table is included
275+
* <li>events that set a variable and either affects included database or is a system-wide
276+
* variable
277+
* <ul>
278+
*/
279+
// TODO javadoc
280+
public boolean anyMatch(RelationalTableFilters filters) {
281+
Predicate<String> databaseFilter = filters.databaseFilter();
282+
TableFilter tableFilter = filters.dataCollectionFilter();
283+
return events.stream()
284+
.anyMatch(
285+
event ->
286+
(event instanceof DatabaseEvent)
287+
&& databaseFilter.test(
288+
((DatabaseEvent) event).databaseName())
289+
|| (event instanceof TableEvent)
290+
&& tableFilter.isIncluded(
291+
((TableEvent) event).tableId())
292+
|| (event instanceof SetVariableEvent)
293+
&& (!((SetVariableEvent) event)
294+
.databaseName()
295+
.isPresent()
296+
|| databaseFilter.test(
297+
((SetVariableEvent) event)
298+
.databaseName()
299+
.get())));
300+
}
301+
302+
public boolean anyMatch(RelationalTableFilters filters, boolean parserOnlineDDL) {
303+
Predicate<String> databaseFilter = filters.databaseFilter();
304+
TableFilter tableFilter = filters.dataCollectionFilter();
305+
return events.stream()
306+
.anyMatch(
307+
event -> {
308+
if (event instanceof DatabaseEvent) {
309+
DatabaseEvent dbEvent = (DatabaseEvent) event;
310+
return databaseFilter.test(dbEvent.databaseName());
311+
}
312+
if (event instanceof TableEvent) {
313+
TableEvent tableEvent = (TableEvent) event;
314+
TableId tableId = tableEvent.tableId();
315+
boolean isIncludedByFilter = tableFilter.isIncluded(tableId);
316+
boolean isGhostTable =
317+
parserOnlineDDL
318+
&& tableId != null
319+
&& GHOST_TABLE_PATTERN
320+
.matcher(tableId.table())
321+
.matches();
322+
LOGGER.info(
323+
"isIncludedByFilter:{},isGhostTable:{},result:{}",
324+
isIncludedByFilter,
325+
isGhostTable,
326+
isIncludedByFilter || isGhostTable);
327+
return isIncludedByFilter || isGhostTable;
328+
}
329+
if (event instanceof SetVariableEvent) {
330+
SetVariableEvent varEvent = (SetVariableEvent) event;
331+
return !varEvent.databaseName().isPresent()
332+
|| (varEvent.databaseName().isPresent()
333+
&& databaseFilter.test(
334+
varEvent.databaseName().get()));
335+
}
336+
return false;
337+
});
338+
}
339+
}

0 commit comments

Comments
 (0)