Skip to content

Commit ae7c4b5

Browse files
committed
[FLINK-37676][cdc-common] Add caching mechanism to Selectors for improved performance
1 parent 95fe4d3 commit ae7c4b5

File tree

1 file changed

+24
-0
lines changed

1 file changed

+24
-0
lines changed

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/schema/Selectors.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@
2020
import org.apache.flink.cdc.common.event.TableId;
2121
import org.apache.flink.cdc.common.utils.Predicates;
2222

23+
import org.apache.flink.shaded.guava31.com.google.common.cache.Cache;
24+
import org.apache.flink.shaded.guava31.com.google.common.cache.CacheBuilder;
25+
26+
import java.time.Duration;
2327
import java.util.ArrayList;
2428
import java.util.Iterator;
2529
import java.util.List;
@@ -29,8 +33,16 @@
2933
/** Selectors for filtering tables. */
3034
public class Selectors {
3135

36+
private static final Duration CACHE_EXPIRE_DURATION = Duration.ofHours(1);
37+
3238
private List<Selector> selectors;
3339

40+
private final Cache<TableId, Boolean> cache =
41+
CacheBuilder.newBuilder()
42+
.expireAfterAccess(CACHE_EXPIRE_DURATION)
43+
.maximumSize(1024)
44+
.build();
45+
3446
private Selectors() {}
3547

3648
/**
@@ -73,6 +85,18 @@ public boolean isMatch(TableId tableId) {
7385

7486
/** Match the {@link TableId} against the {@link Selector}s. * */
7587
public boolean isMatch(TableId tableId) {
88+
Boolean cachedResult = cache.getIfPresent(tableId);
89+
if (cachedResult != null) {
90+
return cachedResult;
91+
}
92+
93+
boolean match = computeIsMatch(tableId);
94+
cache.put(tableId, match);
95+
return match;
96+
}
97+
98+
/** Computes the match result if not present in the cache */
99+
private boolean computeIsMatch(TableId tableId) {
76100
for (Selector selector : selectors) {
77101
if (selector.isMatch(tableId)) {
78102
return true;

0 commit comments

Comments
 (0)