-
Notifications
You must be signed in to change notification settings - Fork 2.1k
[FLINK-37096][cdc-connect] fix The purpose of the issue is to change the value of the delay curve from 20098d to -1ms in the full phase. #4034
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
Sukang1002
wants to merge
2
commits into
apache:master
Choose a base branch
from
Sukang1002:37096
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
268 changes: 268 additions & 0 deletions
268
...b-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBMetricCase.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,268 @@ | ||
package org.apache.flink.cdc.connectors.mongodb.source; | ||
|
||
import org.apache.flink.api.common.typeutils.TypeSerializer; | ||
import org.apache.flink.cdc.connectors.mongodb.MongoDBSource; | ||
import org.apache.flink.cdc.connectors.mongodb.utils.MongoDBContainer; | ||
import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema; | ||
import org.apache.flink.configuration.Configuration; | ||
import org.apache.flink.core.execution.JobClient; | ||
import org.apache.flink.metrics.Gauge; | ||
import org.apache.flink.metrics.Metric; | ||
import org.apache.flink.metrics.groups.OperatorMetricGroup; | ||
import org.apache.flink.runtime.metrics.MetricNames; | ||
import org.apache.flink.runtime.metrics.groups.InternalSourceReaderMetricGroup; | ||
import org.apache.flink.runtime.minicluster.RpcServiceSharing; | ||
import org.apache.flink.runtime.testutils.InMemoryReporter; | ||
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; | ||
import org.apache.flink.streaming.api.datastream.DataStream; | ||
import org.apache.flink.streaming.api.datastream.DataStreamSource; | ||
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; | ||
import org.apache.flink.streaming.api.functions.source.SourceFunction; | ||
import org.apache.flink.streaming.api.operators.collect.CollectResultIterator; | ||
import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator; | ||
import org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory; | ||
import org.apache.flink.streaming.api.operators.collect.CollectStreamSink; | ||
import org.apache.flink.test.util.MiniClusterWithClientResource; | ||
|
||
import com.mongodb.ConnectionString; | ||
import com.mongodb.MongoClientSettings; | ||
import com.mongodb.client.MongoClient; | ||
import com.mongodb.client.MongoClients; | ||
import com.mongodb.client.MongoCollection; | ||
import com.mongodb.client.MongoDatabase; | ||
import com.mongodb.client.model.Filters; | ||
import com.mongodb.client.model.Updates; | ||
import org.bson.Document; | ||
import org.junit.Before; | ||
import org.junit.Rule; | ||
import org.junit.Test; | ||
import org.junit.runner.RunWith; | ||
import org.junit.runners.Parameterized; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
import org.testcontainers.containers.output.Slf4jLogConsumer; | ||
import org.testcontainers.lifecycle.Startables; | ||
|
||
import java.time.Duration; | ||
import java.util.Arrays; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.UUID; | ||
import java.util.stream.Collectors; | ||
import java.util.stream.Stream; | ||
|
||
import static org.apache.flink.cdc.connectors.mongodb.utils.MongoDBContainer.FLINK_USER; | ||
import static org.apache.flink.cdc.connectors.mongodb.utils.MongoDBContainer.FLINK_USER_PASSWORD; | ||
import static org.apache.flink.util.Preconditions.checkState; | ||
import static org.junit.Assert.assertEquals; | ||
import static org.junit.Assert.assertTrue; | ||
|
||
/** IT tests for {@link MongoDBSource}. */ | ||
@RunWith(Parameterized.class) | ||
public class MongoDBMetricCase { | ||
Comment on lines
+61
to
+62
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please write test cases based on JUnit 5 and AssertJ. |
||
protected InMemoryReporter metricReporter = InMemoryReporter.createWithRetainedMetrics(); | ||
private static final Logger LOG = LoggerFactory.getLogger(MongoDBMetricCase.class); | ||
|
||
public MongoDBMetricCase(String mongoVersion) { | ||
this.mongoContainer = | ||
new MongoDBContainer("mongo:" + mongoVersion) | ||
.withSharding() | ||
.withLogConsumer(new Slf4jLogConsumer(LOG)); | ||
Comment on lines
+67
to
+70
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider extending |
||
} | ||
|
||
public static String[] getMongoVersions() { | ||
String specifiedMongoVersion = System.getProperty("specifiedMongoVersion"); | ||
if (specifiedMongoVersion != null) { | ||
return new String[] {specifiedMongoVersion}; | ||
} else { | ||
return new String[] {"6.0.16", "7.0.12"}; | ||
} | ||
} | ||
|
||
protected static final int DEFAULT_PARALLELISM = 4; | ||
|
||
@Rule public final MongoDBContainer mongoContainer; | ||
|
||
protected MongoClient mongodbClient; | ||
|
||
@Rule | ||
public final MiniClusterWithClientResource miniClusterResource = | ||
new MiniClusterWithClientResource( | ||
new MiniClusterResourceConfiguration.Builder() | ||
.setNumberTaskManagers(1) | ||
.setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM) | ||
.setRpcServiceSharing(RpcServiceSharing.DEDICATED) | ||
.withHaLeadershipControl() | ||
.setConfiguration( | ||
metricReporter.addToConfiguration(new Configuration())) | ||
.build()); | ||
|
||
@Before | ||
public void startContainers() { | ||
LOG.info("Starting containers..."); | ||
Startables.deepStart(Stream.of(mongoContainer)).join(); | ||
|
||
MongoClientSettings settings = | ||
MongoClientSettings.builder() | ||
.applyConnectionString( | ||
new ConnectionString(mongoContainer.getConnectionString())) | ||
.build(); | ||
mongodbClient = MongoClients.create(settings); | ||
|
||
LOG.info("Containers are started."); | ||
} | ||
|
||
public static final Duration TIMEOUT = Duration.ofSeconds(300); | ||
|
||
@Parameterized.Parameters(name = "mongoVersion: {0}") | ||
public static Object[] parameters() { | ||
return Stream.of(getMongoVersions()).map(e -> new Object[] {e}).toArray(); | ||
} | ||
|
||
@Test | ||
public void testSourceMetrics() throws Exception { | ||
String customerDatabase = mongoContainer.executeCommandFileInSeparateDatabase("customer"); | ||
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); | ||
env.setParallelism(1); | ||
env.enableCheckpointing(200L); | ||
SourceFunction<String> sourceFunction = | ||
MongoDBSource.<String>builder() | ||
.hosts(mongoContainer.getHostAndPort()) | ||
.username(FLINK_USER) | ||
.password(FLINK_USER_PASSWORD) | ||
.databaseList(customerDatabase) | ||
.collectionList( | ||
getCollectionNameRegex( | ||
customerDatabase, new String[] {"customers"})) | ||
.deserializer(new JsonDebeziumDeserializationSchema()) | ||
.build(); | ||
DataStreamSource<String> stream = env.addSource(sourceFunction, "MongoDB CDC Source"); | ||
CollectResultIterator<String> iterator = addCollector(env, stream); | ||
JobClient jobClient = env.executeAsync(); | ||
iterator.setJobClient(jobClient); | ||
|
||
// // ---------------------------- Snapshot phase ------------------------------ | ||
// // Wait until we receive all 21 snapshot records | ||
int numSnapshotRecordsExpected = 21; | ||
int numSnapshotRecordsReceived = 0; | ||
|
||
while (numSnapshotRecordsReceived < numSnapshotRecordsExpected && iterator.hasNext()) { | ||
iterator.next(); | ||
numSnapshotRecordsReceived++; | ||
} | ||
|
||
// Check metrics | ||
List<OperatorMetricGroup> metricGroups = | ||
metricReporter.findOperatorMetricGroups(jobClient.getJobID(), "MongoDB CDC Source"); | ||
|
||
// There should be only 1 parallelism of source, so it's safe to get the only group | ||
OperatorMetricGroup group = metricGroups.get(0); | ||
Map<String, Metric> metrics = metricReporter.getMetricsByGroup(group); | ||
|
||
// numRecordsOut | ||
assertEquals( | ||
numSnapshotRecordsExpected, | ||
group.getIOMetricGroup().getNumRecordsOutCounter().getCount()); | ||
|
||
// currentEmitEventTimeLag should be UNDEFINED during snapshot phase | ||
assertTrue(metrics.containsKey(MetricNames.CURRENT_EMIT_EVENT_TIME_LAG)); | ||
Gauge<Long> currentEmitEventTimeLag = | ||
(Gauge<Long>) metrics.get(MetricNames.CURRENT_EMIT_EVENT_TIME_LAG); | ||
assertEquals( | ||
InternalSourceReaderMetricGroup.UNDEFINED, | ||
(long) currentEmitEventTimeLag.getValue()); | ||
// currentFetchEventTimeLag should be UNDEFINED during snapshot phase | ||
assertTrue(metrics.containsKey(MetricNames.CURRENT_FETCH_EVENT_TIME_LAG)); | ||
Gauge<Long> currentFetchEventTimeLag = | ||
(Gauge<Long>) metrics.get(MetricNames.CURRENT_FETCH_EVENT_TIME_LAG); | ||
assertEquals( | ||
InternalSourceReaderMetricGroup.UNDEFINED, | ||
(long) currentFetchEventTimeLag.getValue()); | ||
// sourceIdleTime should be positive (we can't know the exact value) | ||
assertTrue(metrics.containsKey(MetricNames.SOURCE_IDLE_TIME)); | ||
Gauge<Long> sourceIdleTime = (Gauge<Long>) metrics.get(MetricNames.SOURCE_IDLE_TIME); | ||
assertTrue(sourceIdleTime.getValue() > 0); | ||
assertTrue(sourceIdleTime.getValue() < TIMEOUT.toMillis()); | ||
|
||
// --------------------------------- Binlog phase ----------------------------- | ||
makeFirstPartChangeStreamEvents(mongodbClient.getDatabase(customerDatabase), "customers"); | ||
// Wait until we receive 4 changes made above | ||
int numBinlogRecordsExpected = 4; | ||
int numBinlogRecordsReceived = 0; | ||
while (numBinlogRecordsReceived < numBinlogRecordsExpected && iterator.hasNext()) { | ||
iterator.next(); | ||
numBinlogRecordsReceived++; | ||
} | ||
|
||
// Check metrics | ||
// numRecordsOut | ||
assertEquals( | ||
numSnapshotRecordsExpected + numBinlogRecordsExpected, | ||
group.getIOMetricGroup().getNumRecordsOutCounter().getCount()); | ||
|
||
// currentEmitEventTimeLag should be reasonably positive (we can't know the exact value) | ||
assertTrue(currentEmitEventTimeLag.getValue() > 0); | ||
assertTrue(currentEmitEventTimeLag.getValue() < TIMEOUT.toMillis()); | ||
|
||
// currentEmitEventTimeLag should be reasonably positive (we can't know the exact value) | ||
assertTrue(currentFetchEventTimeLag.getValue() > 0); | ||
assertTrue(currentFetchEventTimeLag.getValue() < TIMEOUT.toMillis()); | ||
|
||
// currentEmitEventTimeLag should be reasonably positive (we can't know the exact value) | ||
assertTrue(sourceIdleTime.getValue() > 0); | ||
assertTrue(sourceIdleTime.getValue() < TIMEOUT.toMillis()); | ||
|
||
jobClient.cancel().get(); | ||
iterator.close(); | ||
} | ||
|
||
private <T> CollectResultIterator<T> addCollector( | ||
StreamExecutionEnvironment env, DataStream<T> stream) { | ||
TypeSerializer<T> serializer = | ||
stream.getTransformation().getOutputType().createSerializer(env.getConfig()); // | ||
String accumulatorName = "dataStreamCollect_" + UUID.randomUUID(); | ||
CollectSinkOperatorFactory<T> factory = | ||
new CollectSinkOperatorFactory<>(serializer, accumulatorName); | ||
CollectSinkOperator<T> operator = (CollectSinkOperator<T>) factory.getOperator(); | ||
CollectResultIterator<T> iterator = | ||
new CollectResultIterator<>( | ||
operator.getOperatorIdFuture(), | ||
serializer, | ||
accumulatorName, | ||
env.getCheckpointConfig(), | ||
10000L); | ||
CollectStreamSink<T> sink = new CollectStreamSink<>(stream, factory); | ||
sink.name("Data stream collect sink"); | ||
env.addOperator(sink.getTransformation()); | ||
return iterator; | ||
} | ||
|
||
private void makeFirstPartChangeStreamEvents(MongoDatabase mongoDatabase, String collection) { | ||
MongoCollection<Document> mongoCollection = mongoDatabase.getCollection(collection); | ||
mongoCollection.updateOne(Filters.eq("cid", 101L), Updates.set("address", "Hangzhou")); | ||
mongoCollection.deleteOne(Filters.eq("cid", 102L)); | ||
mongoCollection.insertOne(customerDocOf(102L, "user_2", "Shanghai", "123567891234")); | ||
mongoCollection.updateOne(Filters.eq("cid", 103L), Updates.set("address", "Hangzhou")); | ||
} | ||
|
||
private Document customerDocOf(Long cid, String name, String address, String phoneNumber) { | ||
Document document = new Document(); | ||
document.put("cid", cid); | ||
document.put("name", name); | ||
document.put("address", address); | ||
document.put("phone_number", phoneNumber); | ||
return document; | ||
} | ||
|
||
private String getCollectionNameRegex(String database, String[] captureCustomerCollections) { | ||
checkState(captureCustomerCollections.length > 0); | ||
if (captureCustomerCollections.length == 1) { | ||
return database + "." + captureCustomerCollections[0]; | ||
} else { | ||
// pattern that matches multiple collections | ||
return Arrays.stream(captureCustomerCollections) | ||
.map(coll -> "^(" + database + "." + coll + ")$") | ||
.collect(Collectors.joining("|")); | ||
} | ||
} | ||
} |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add license headers for newly committed files, and could be renamed to
MongoDBMetricsITCase