Skip to content

Commit e6e1fa7

Browse files
committed
[FLINK-37676][cdc-common] Add caching mechanism to Selectors for improved performance
1 parent 7a57f55 commit e6e1fa7

File tree

3 files changed

+186
-1
lines changed

3 files changed

+186
-1
lines changed

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/schema/Selectors.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@
2020
import org.apache.flink.cdc.common.event.TableId;
2121
import org.apache.flink.cdc.common.utils.Predicates;
2222

23+
import org.apache.flink.shaded.guava31.com.google.common.cache.Cache;
24+
import org.apache.flink.shaded.guava31.com.google.common.cache.CacheBuilder;
25+
26+
import java.time.Duration;
2327
import java.util.ArrayList;
2428
import java.util.Iterator;
2529
import java.util.List;
@@ -29,8 +33,16 @@
2933
/** Selectors for filtering tables. */
3034
public class Selectors {
3135

36+
private static final Duration CACHE_EXPIRE_DURATION = Duration.ofHours(1);
37+
3238
private List<Selector> selectors;
3339

40+
private final Cache<TableId, Boolean> cache =
41+
CacheBuilder.newBuilder()
42+
.expireAfterAccess(CACHE_EXPIRE_DURATION)
43+
.maximumSize(1024)
44+
.build();
45+
3446
private Selectors() {}
3547

3648
/**
@@ -71,8 +83,20 @@ public boolean isMatch(TableId tableId) {
7183
}
7284
}
7385

74-
/** Match the {@link TableId} against the {@link Selector}s. * */
86+
/** Match the {@link TableId} against the {@link Selector}s. */
7587
public boolean isMatch(TableId tableId) {
88+
Boolean cachedResult = cache.getIfPresent(tableId);
89+
if (cachedResult != null) {
90+
return cachedResult;
91+
}
92+
93+
boolean match = computeIsMatch(tableId);
94+
cache.put(tableId, match);
95+
return match;
96+
}
97+
98+
/** Computes the match result if not present in the cache. */
99+
private boolean computeIsMatch(TableId tableId) {
76100
for (Selector selector : selectors) {
77101
if (selector.isMatch(tableId)) {
78102
return true;

flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ limitations under the License.
4242
<maven.plugin.download.version>1.6.8</maven.plugin.download.version>
4343
<iceberg.version>1.6.1</iceberg.version>
4444
<hive.version>2.3.9</hive.version>
45+
<jmh.version>1.37</jmh.version>
4546
</properties>
4647

4748
<dependencies>
@@ -205,6 +206,21 @@ limitations under the License.
205206
<scope>test</scope>
206207
</dependency>
207208

209+
<!-- benchmark -->
210+
<dependency>
211+
<groupId>org.openjdk.jmh</groupId>
212+
<artifactId>jmh-core</artifactId>
213+
<version>${jmh.version}</version>
214+
<scope>test</scope>
215+
</dependency>
216+
217+
<dependency>
218+
<groupId>org.openjdk.jmh</groupId>
219+
<artifactId>jmh-generator-annprocess</artifactId>
220+
<version>${jmh.version}</version>
221+
<scope>test</scope>
222+
</dependency>
223+
208224
<!-- This is for testing Scala UDF.-->
209225
<dependency>
210226
<groupId>org.scala-lang</groupId>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.pipeline.tests.benchmark;
19+
20+
import org.apache.flink.cdc.common.event.TableId;
21+
import org.apache.flink.cdc.common.schema.Selectors;
22+
23+
import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableList;
24+
25+
import org.openjdk.jmh.annotations.Benchmark;
26+
import org.openjdk.jmh.annotations.BenchmarkMode;
27+
import org.openjdk.jmh.annotations.Fork;
28+
import org.openjdk.jmh.annotations.Level;
29+
import org.openjdk.jmh.annotations.Measurement;
30+
import org.openjdk.jmh.annotations.Mode;
31+
import org.openjdk.jmh.annotations.OutputTimeUnit;
32+
import org.openjdk.jmh.annotations.Scope;
33+
import org.openjdk.jmh.annotations.Setup;
34+
import org.openjdk.jmh.annotations.State;
35+
import org.openjdk.jmh.annotations.Threads;
36+
import org.openjdk.jmh.annotations.Warmup;
37+
import org.openjdk.jmh.runner.Runner;
38+
import org.openjdk.jmh.runner.options.Options;
39+
import org.openjdk.jmh.runner.options.OptionsBuilder;
40+
41+
import java.util.List;
42+
import java.util.concurrent.TimeUnit;
43+
44+
/**
45+
* Benchmark for table selector performance with and without cache.
46+
*
47+
* <pre>
48+
* Benchmark Mode Cnt Score Error Units
49+
* SelectorsBenchmark.testSelectorWithCache thrpt 20 1028.979 ± 218.663 ops/ms
50+
* SelectorsBenchmark.testSelectorWithoutCache thrpt 20 136.747 ± 11.872 ops/ms
51+
* </pre>
52+
*/
53+
@BenchmarkMode(Mode.Throughput)
54+
@OutputTimeUnit(TimeUnit.MILLISECONDS)
55+
@State(Scope.Benchmark)
56+
@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
57+
@Measurement(iterations = 10, time = 2, timeUnit = TimeUnit.SECONDS)
58+
@Fork(2)
59+
@Threads(2)
60+
public class SelectorsBenchmark {
61+
62+
private Selectors selectors;
63+
private List<TableId> queryTableIds;
64+
65+
@Setup(Level.Trial)
66+
public void setup() {
67+
selectors =
68+
new Selectors.SelectorsBuilder()
69+
.includeTables(
70+
"test_wms_inventory_[a-z]+.inventory_batch_detail,"
71+
+ "test_wms_inventory_[a-z]+.inventory_batch_detail_record,"
72+
+ "test_wms_inventory_[a-z]+.inventory_batch_input,"
73+
+ "test_wms_inventory_[a-z]+.inventory_flow_volume_level,"
74+
+ "test_wms_inventory_[a-z]+.inventory_snapshot,"
75+
+ "test_wms_log_[a-z]+.log_common_log_[a-z]+")
76+
.build();
77+
78+
queryTableIds =
79+
ImmutableList.of(
80+
TableId.tableId(
81+
"test_wms_common_europe.occupy_strategy_exe_progress_order"),
82+
TableId.tableId("test_wms_common_europe.wave_strategy_rule_relation"),
83+
TableId.tableId("db.sc2.A1"),
84+
TableId.tableId("db2.sc2.A1"),
85+
TableId.tableId("test_wms_output_s.out_moment_storage_location_relation"),
86+
TableId.tableId("test_wms_output_a.out_moment_storage_location_relation"));
87+
88+
// warm up cache
89+
for (TableId id : queryTableIds) {
90+
selectors.isMatch(id);
91+
}
92+
}
93+
94+
/**
95+
* Benchmark to evaluate the performance of table selector with caching enabled.
96+
*
97+
* <p>This benchmark measures throughput when using a pre-built {@link Selectors} instance that
98+
* leverages internal caching mechanisms. This simulates a typical usage scenario where selector
99+
* rules are initialized once and reused across multiple queries.
100+
*
101+
* <p>Expected to perform significantly better than non-cached version due to avoidance of
102+
* repeated regex parsing and compilation.
103+
*/
104+
@Benchmark
105+
public void testSelectorWithCache() {
106+
for (TableId id : queryTableIds) {
107+
selectors.isMatch(id);
108+
}
109+
}
110+
111+
/**
112+
* Benchmark to evaluate the performance of table selector without using cache.
113+
*
114+
* <p>This benchmark constructs a new {@link Selectors} instance for each invocation, simulating
115+
* a cold-start or ad-hoc usage scenario. The overhead includes pattern parsing and matcher
116+
* construction, which significantly impacts throughput.
117+
*
118+
* <p>Useful for understanding worst-case performance and comparing against the cached version.
119+
*/
120+
@Benchmark
121+
public void testSelectorWithoutCache() {
122+
Selectors freshSelectors =
123+
new Selectors.SelectorsBuilder()
124+
.includeTables(
125+
"test_wms_inventory_[a-z]+.inventory_batch_detail,"
126+
+ "test_wms_inventory_[a-z]+.inventory_batch_detail_record,"
127+
+ "test_wms_inventory_[a-z]+.inventory_batch_input,"
128+
+ "test_wms_inventory_[a-z]+.inventory_flow_volume_level,"
129+
+ "test_wms_inventory_[a-z]+.inventory_snapshot,"
130+
+ "test_wms_log_[a-z]+.log_common_log_[a-z]+")
131+
.build();
132+
for (TableId id : queryTableIds) {
133+
freshSelectors.isMatch(id);
134+
}
135+
}
136+
137+
public static void main(String[] args) throws Exception {
138+
Options options =
139+
new OptionsBuilder()
140+
.include(SelectorsBenchmark.class.getSimpleName())
141+
.detectJvmArgs()
142+
.build();
143+
new Runner(options).run();
144+
}
145+
}

0 commit comments

Comments
 (0)