Skip to content

Commit 3965e1b

Browse files
committed
[FLINK-37676][cdc-common] Add caching mechanism to Selectors for improved performance
1 parent c2230d5 commit 3965e1b

File tree

3 files changed

+169
-1
lines changed

3 files changed

+169
-1
lines changed

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/schema/Selectors.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@
2020
import org.apache.flink.cdc.common.event.TableId;
2121
import org.apache.flink.cdc.common.utils.Predicates;
2222

23+
import org.apache.flink.shaded.guava31.com.google.common.cache.Cache;
24+
import org.apache.flink.shaded.guava31.com.google.common.cache.CacheBuilder;
25+
26+
import java.time.Duration;
2327
import java.util.ArrayList;
2428
import java.util.Iterator;
2529
import java.util.List;
@@ -29,8 +33,16 @@
2933
/** Selectors for filtering tables. */
3034
public class Selectors {
3135

36+
private static final Duration CACHE_EXPIRE_DURATION = Duration.ofHours(1);
37+
3238
private List<Selector> selectors;
3339

40+
private final Cache<TableId, Boolean> cache =
41+
CacheBuilder.newBuilder()
42+
.expireAfterAccess(CACHE_EXPIRE_DURATION)
43+
.maximumSize(1024)
44+
.build();
45+
3446
private Selectors() {}
3547

3648
/**
@@ -71,8 +83,20 @@ public boolean isMatch(TableId tableId) {
7183
}
7284
}
7385

74-
/** Match the {@link TableId} against the {@link Selector}s. * */
86+
/** Match the {@link TableId} against the {@link Selector}s. */
7587
public boolean isMatch(TableId tableId) {
88+
Boolean cachedResult = cache.getIfPresent(tableId);
89+
if (cachedResult != null) {
90+
return cachedResult;
91+
}
92+
93+
boolean match = computeIsMatch(tableId);
94+
cache.put(tableId, match);
95+
return match;
96+
}
97+
98+
/** Computes the match result if not present in the cache. */
99+
private boolean computeIsMatch(TableId tableId) {
76100
for (Selector selector : selectors) {
77101
if (selector.isMatch(tableId)) {
78102
return true;

flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ limitations under the License.
4242
<maven.plugin.download.version>1.6.8</maven.plugin.download.version>
4343
<iceberg.version>1.6.1</iceberg.version>
4444
<hive.version>2.3.9</hive.version>
45+
<jmh.version>1.37</jmh.version>
4546
</properties>
4647

4748
<dependencies>
@@ -205,6 +206,21 @@ limitations under the License.
205206
<scope>test</scope>
206207
</dependency>
207208

209+
<!-- benchmark -->
210+
<dependency>
211+
<groupId>org.openjdk.jmh</groupId>
212+
<artifactId>jmh-core</artifactId>
213+
<version>${jmh.version}</version>
214+
<scope>test</scope>
215+
</dependency>
216+
217+
<dependency>
218+
<groupId>org.openjdk.jmh</groupId>
219+
<artifactId>jmh-generator-annprocess</artifactId>
220+
<version>${jmh.version}</version>
221+
<scope>test</scope>
222+
</dependency>
223+
208224
<!-- This is for testing Scala UDF.-->
209225
<dependency>
210226
<groupId>org.scala-lang</groupId>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
package org.apache.flink.cdc.pipeline.tests.benchmark;
2+
3+
import org.apache.flink.cdc.common.event.TableId;
4+
import org.apache.flink.cdc.common.schema.Selectors;
5+
6+
import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableList;
7+
8+
import org.openjdk.jmh.annotations.Benchmark;
9+
import org.openjdk.jmh.annotations.BenchmarkMode;
10+
import org.openjdk.jmh.annotations.Fork;
11+
import org.openjdk.jmh.annotations.Level;
12+
import org.openjdk.jmh.annotations.Measurement;
13+
import org.openjdk.jmh.annotations.Mode;
14+
import org.openjdk.jmh.annotations.OutputTimeUnit;
15+
import org.openjdk.jmh.annotations.Scope;
16+
import org.openjdk.jmh.annotations.Setup;
17+
import org.openjdk.jmh.annotations.State;
18+
import org.openjdk.jmh.annotations.Threads;
19+
import org.openjdk.jmh.annotations.Warmup;
20+
import org.openjdk.jmh.runner.Runner;
21+
import org.openjdk.jmh.runner.options.Options;
22+
import org.openjdk.jmh.runner.options.OptionsBuilder;
23+
24+
import java.util.List;
25+
import java.util.concurrent.TimeUnit;
26+
27+
/**
28+
* Benchmark for table selector performance with and without cache.
29+
*
30+
* <pre>
31+
* Benchmark Mode Cnt Score Error Units
32+
* SelectorsBenchmark.testSelectorWithCache thrpt 20 1028.979 ± 218.663 ops/ms
33+
* SelectorsBenchmark.testSelectorWithoutCache thrpt 20 136.747 ± 11.872 ops/ms
34+
* </pre>
35+
*/
36+
@BenchmarkMode(Mode.Throughput)
37+
@OutputTimeUnit(TimeUnit.MILLISECONDS)
38+
@State(Scope.Benchmark)
39+
@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
40+
@Measurement(iterations = 10, time = 2, timeUnit = TimeUnit.SECONDS)
41+
@Fork(2)
42+
@Threads(2)
43+
public class SelectorsBenchmark {
44+
45+
private Selectors selectors;
46+
private List<TableId> queryTableIds;
47+
48+
@Setup(Level.Trial)
49+
public void setup() {
50+
selectors =
51+
new Selectors.SelectorsBuilder()
52+
.includeTables(
53+
"test_wms_inventory_[a-z]+.inventory_batch_detail,"
54+
+ "test_wms_inventory_[a-z]+.inventory_batch_detail_record,"
55+
+ "test_wms_inventory_[a-z]+.inventory_batch_input,"
56+
+ "test_wms_inventory_[a-z]+.inventory_flow_volume_level,"
57+
+ "test_wms_inventory_[a-z]+.inventory_snapshot,"
58+
+ "test_wms_log_[a-z]+.log_common_log_[a-z]+")
59+
.build();
60+
61+
queryTableIds =
62+
ImmutableList.of(
63+
TableId.tableId(
64+
"test_wms_common_europe.occupy_strategy_exe_progress_order"),
65+
TableId.tableId("test_wms_common_europe.wave_strategy_rule_relation"),
66+
TableId.tableId("db.sc2.A1"),
67+
TableId.tableId("db2.sc2.A1"),
68+
TableId.tableId("test_wms_output_s.out_moment_storage_location_relation"),
69+
TableId.tableId("test_wms_output_a.out_moment_storage_location_relation"));
70+
71+
// warm up cache
72+
for (TableId id : queryTableIds) {
73+
selectors.isMatch(id);
74+
}
75+
}
76+
77+
/**
78+
* Benchmark to evaluate the performance of table selector with caching enabled.
79+
*
80+
* <p>This benchmark measures throughput when using a pre-built {@link Selectors} instance that
81+
* leverages internal caching mechanisms. This simulates a typical usage scenario where selector
82+
* rules are initialized once and reused across multiple queries.
83+
*
84+
* <p>Expected to perform significantly better than non-cached version due to avoidance of
85+
* repeated regex parsing and compilation.
86+
*/
87+
@Benchmark
88+
public void testSelectorWithCache() {
89+
for (TableId id : queryTableIds) {
90+
selectors.isMatch(id);
91+
}
92+
}
93+
94+
/**
95+
* Benchmark to evaluate the performance of table selector without using cache.
96+
*
97+
* <p>This benchmark constructs a new {@link Selectors} instance for each invocation, simulating
98+
* a cold-start or ad-hoc usage scenario. The overhead includes pattern parsing and matcher
99+
* construction, which significantly impacts throughput.
100+
*
101+
* <p>Useful for understanding worst-case performance and comparing against the cached version.
102+
*/
103+
@Benchmark
104+
public void testSelectorWithoutCache() {
105+
Selectors freshSelectors =
106+
new Selectors.SelectorsBuilder()
107+
.includeTables(
108+
"test_wms_inventory_[a-z]+.inventory_batch_detail,"
109+
+ "test_wms_inventory_[a-z]+.inventory_batch_detail_record,"
110+
+ "test_wms_inventory_[a-z]+.inventory_batch_input,"
111+
+ "test_wms_inventory_[a-z]+.inventory_flow_volume_level,"
112+
+ "test_wms_inventory_[a-z]+.inventory_snapshot,"
113+
+ "test_wms_log_[a-z]+.log_common_log_[a-z]+")
114+
.build();
115+
for (TableId id : queryTableIds) {
116+
freshSelectors.isMatch(id);
117+
}
118+
}
119+
120+
public static void main(String[] args) throws Exception {
121+
Options options =
122+
new OptionsBuilder()
123+
.include(SelectorsBenchmark.class.getSimpleName())
124+
.detectJvmArgs()
125+
.build();
126+
new Runner(options).run();
127+
}
128+
}

0 commit comments

Comments
 (0)