Skip to content

[FLINK-37096][cdc-connect] fix The purpose of the issue is to change the value of the delay curve from 20098d to -1ms in the full phase. #4034

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,13 @@ public class DebeziumChangeFetcher<T> {
* currentFetchEventTimeLag = FetchTime - messageTimestamp, where the FetchTime is the time the
* record fetched into the source operator.
*/
private volatile long fetchDelay = 0L;
private volatile long fetchDelay = -1L;

/**
* emitDelay = EmitTime - messageTimestamp, where the EmitTime is the time the record leaves the
* source operator.
*/
private volatile long emitDelay = 0L;
private volatile long emitDelay = -1L;

/** The number of records that failed to parse or deserialize. */
private volatile AtomicLong numRecordInErrors = new AtomicLong(0);
Expand Down Expand Up @@ -230,7 +230,10 @@ private void handleBatch(List<ChangeEvent<SourceRecord, SourceRecord>> changeEve
for (ChangeEvent<SourceRecord, SourceRecord> event : changeEvents) {
SourceRecord record = event.value();
updateMessageTimestamp(record);
fetchDelay = isInDbSnapshotPhase ? 0L : processTime - messageTimestamp;
fetchDelay =
(isInDbSnapshotPhase || messageTimestamp == 0L)
? -1L
: processTime - messageTimestamp;

if (isHeartbeatEvent(record)) {
// keep offset update
Expand Down Expand Up @@ -269,7 +272,9 @@ private void emitRecordsUnderCheckpointLock(
T record;
while ((record = records.poll()) != null) {
emitDelay =
isInDbSnapshotPhase ? 0L : System.currentTimeMillis() - messageTimestamp;
(isInDbSnapshotPhase || messageTimestamp == 0L)
? -1L
: System.currentTimeMillis() - messageTimestamp;
sourceContext.collect(record);
}
// update offset to state
Expand Down
Copy link
Member

@yuxiqian yuxiqian Jun 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add license headers for newly committed files, and could be renamed to MongoDBMetricsITCase

Original file line number Diff line number Diff line change
@@ -0,0 +1,268 @@
package org.apache.flink.cdc.connectors.mongodb.source;

import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.cdc.connectors.mongodb.MongoDBSource;
import org.apache.flink.cdc.connectors.mongodb.utils.MongoDBContainer;
import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.runtime.metrics.groups.InternalSourceReaderMetricGroup;
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
import org.apache.flink.runtime.testutils.InMemoryReporter;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.collect.CollectResultIterator;
import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator;
import org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory;
import org.apache.flink.streaming.api.operators.collect.CollectStreamSink;
import org.apache.flink.test.util.MiniClusterWithClientResource;

import com.mongodb.ConnectionString;
import com.mongodb.MongoClientSettings;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.Updates;
import org.bson.Document;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;

import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.flink.cdc.connectors.mongodb.utils.MongoDBContainer.FLINK_USER;
import static org.apache.flink.cdc.connectors.mongodb.utils.MongoDBContainer.FLINK_USER_PASSWORD;
import static org.apache.flink.util.Preconditions.checkState;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

/** IT tests for {@link MongoDBSource}. */
@RunWith(Parameterized.class)
public class MongoDBMetricCase {
Comment on lines +61 to +62
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please write test cases based on JUnit 5 and AssertJ.

protected InMemoryReporter metricReporter = InMemoryReporter.createWithRetainedMetrics();
private static final Logger LOG = LoggerFactory.getLogger(MongoDBMetricCase.class);

public MongoDBMetricCase(String mongoVersion) {
this.mongoContainer =
new MongoDBContainer("mongo:" + mongoVersion)
.withSharding()
.withLogConsumer(new Slf4jLogConsumer(LOG));
Comment on lines +67 to +70
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider extending MongoDBSourceTestBase instead of constructing containers manually.

}

public static String[] getMongoVersions() {
String specifiedMongoVersion = System.getProperty("specifiedMongoVersion");
if (specifiedMongoVersion != null) {
return new String[] {specifiedMongoVersion};
} else {
return new String[] {"6.0.16", "7.0.12"};
}
}

protected static final int DEFAULT_PARALLELISM = 4;

@Rule public final MongoDBContainer mongoContainer;

protected MongoClient mongodbClient;

@Rule
public final MiniClusterWithClientResource miniClusterResource =
new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setNumberTaskManagers(1)
.setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM)
.setRpcServiceSharing(RpcServiceSharing.DEDICATED)
.withHaLeadershipControl()
.setConfiguration(
metricReporter.addToConfiguration(new Configuration()))
.build());

@Before
public void startContainers() {
LOG.info("Starting containers...");
Startables.deepStart(Stream.of(mongoContainer)).join();

MongoClientSettings settings =
MongoClientSettings.builder()
.applyConnectionString(
new ConnectionString(mongoContainer.getConnectionString()))
.build();
mongodbClient = MongoClients.create(settings);

LOG.info("Containers are started.");
}

public static final Duration TIMEOUT = Duration.ofSeconds(300);

@Parameterized.Parameters(name = "mongoVersion: {0}")
public static Object[] parameters() {
return Stream.of(getMongoVersions()).map(e -> new Object[] {e}).toArray();
}

@Test
public void testSourceMetrics() throws Exception {
String customerDatabase = mongoContainer.executeCommandFileInSeparateDatabase("customer");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(200L);
SourceFunction<String> sourceFunction =
MongoDBSource.<String>builder()
.hosts(mongoContainer.getHostAndPort())
.username(FLINK_USER)
.password(FLINK_USER_PASSWORD)
.databaseList(customerDatabase)
.collectionList(
getCollectionNameRegex(
customerDatabase, new String[] {"customers"}))
.deserializer(new JsonDebeziumDeserializationSchema())
.build();
DataStreamSource<String> stream = env.addSource(sourceFunction, "MongoDB CDC Source");
CollectResultIterator<String> iterator = addCollector(env, stream);
JobClient jobClient = env.executeAsync();
iterator.setJobClient(jobClient);

// // ---------------------------- Snapshot phase ------------------------------
// // Wait until we receive all 21 snapshot records
int numSnapshotRecordsExpected = 21;
int numSnapshotRecordsReceived = 0;

while (numSnapshotRecordsReceived < numSnapshotRecordsExpected && iterator.hasNext()) {
iterator.next();
numSnapshotRecordsReceived++;
}

// Check metrics
List<OperatorMetricGroup> metricGroups =
metricReporter.findOperatorMetricGroups(jobClient.getJobID(), "MongoDB CDC Source");

// There should be only 1 parallelism of source, so it's safe to get the only group
OperatorMetricGroup group = metricGroups.get(0);
Map<String, Metric> metrics = metricReporter.getMetricsByGroup(group);

// numRecordsOut
assertEquals(
numSnapshotRecordsExpected,
group.getIOMetricGroup().getNumRecordsOutCounter().getCount());

// currentEmitEventTimeLag should be UNDEFINED during snapshot phase
assertTrue(metrics.containsKey(MetricNames.CURRENT_EMIT_EVENT_TIME_LAG));
Gauge<Long> currentEmitEventTimeLag =
(Gauge<Long>) metrics.get(MetricNames.CURRENT_EMIT_EVENT_TIME_LAG);
assertEquals(
InternalSourceReaderMetricGroup.UNDEFINED,
(long) currentEmitEventTimeLag.getValue());
// currentFetchEventTimeLag should be UNDEFINED during snapshot phase
assertTrue(metrics.containsKey(MetricNames.CURRENT_FETCH_EVENT_TIME_LAG));
Gauge<Long> currentFetchEventTimeLag =
(Gauge<Long>) metrics.get(MetricNames.CURRENT_FETCH_EVENT_TIME_LAG);
assertEquals(
InternalSourceReaderMetricGroup.UNDEFINED,
(long) currentFetchEventTimeLag.getValue());
// sourceIdleTime should be positive (we can't know the exact value)
assertTrue(metrics.containsKey(MetricNames.SOURCE_IDLE_TIME));
Gauge<Long> sourceIdleTime = (Gauge<Long>) metrics.get(MetricNames.SOURCE_IDLE_TIME);
assertTrue(sourceIdleTime.getValue() > 0);
assertTrue(sourceIdleTime.getValue() < TIMEOUT.toMillis());

// --------------------------------- Binlog phase -----------------------------
makeFirstPartChangeStreamEvents(mongodbClient.getDatabase(customerDatabase), "customers");
// Wait until we receive 4 changes made above
int numBinlogRecordsExpected = 4;
int numBinlogRecordsReceived = 0;
while (numBinlogRecordsReceived < numBinlogRecordsExpected && iterator.hasNext()) {
iterator.next();
numBinlogRecordsReceived++;
}

// Check metrics
// numRecordsOut
assertEquals(
numSnapshotRecordsExpected + numBinlogRecordsExpected,
group.getIOMetricGroup().getNumRecordsOutCounter().getCount());

// currentEmitEventTimeLag should be reasonably positive (we can't know the exact value)
assertTrue(currentEmitEventTimeLag.getValue() > 0);
assertTrue(currentEmitEventTimeLag.getValue() < TIMEOUT.toMillis());

// currentEmitEventTimeLag should be reasonably positive (we can't know the exact value)
assertTrue(currentFetchEventTimeLag.getValue() > 0);
assertTrue(currentFetchEventTimeLag.getValue() < TIMEOUT.toMillis());

// currentEmitEventTimeLag should be reasonably positive (we can't know the exact value)
assertTrue(sourceIdleTime.getValue() > 0);
assertTrue(sourceIdleTime.getValue() < TIMEOUT.toMillis());

jobClient.cancel().get();
iterator.close();
}

private <T> CollectResultIterator<T> addCollector(
StreamExecutionEnvironment env, DataStream<T> stream) {
TypeSerializer<T> serializer =
stream.getTransformation().getOutputType().createSerializer(env.getConfig()); //
String accumulatorName = "dataStreamCollect_" + UUID.randomUUID();
CollectSinkOperatorFactory<T> factory =
new CollectSinkOperatorFactory<>(serializer, accumulatorName);
CollectSinkOperator<T> operator = (CollectSinkOperator<T>) factory.getOperator();
CollectResultIterator<T> iterator =
new CollectResultIterator<>(
operator.getOperatorIdFuture(),
serializer,
accumulatorName,
env.getCheckpointConfig(),
10000L);
CollectStreamSink<T> sink = new CollectStreamSink<>(stream, factory);
sink.name("Data stream collect sink");
env.addOperator(sink.getTransformation());
return iterator;
}

private void makeFirstPartChangeStreamEvents(MongoDatabase mongoDatabase, String collection) {
MongoCollection<Document> mongoCollection = mongoDatabase.getCollection(collection);
mongoCollection.updateOne(Filters.eq("cid", 101L), Updates.set("address", "Hangzhou"));
mongoCollection.deleteOne(Filters.eq("cid", 102L));
mongoCollection.insertOne(customerDocOf(102L, "user_2", "Shanghai", "123567891234"));
mongoCollection.updateOne(Filters.eq("cid", 103L), Updates.set("address", "Hangzhou"));
}

private Document customerDocOf(Long cid, String name, String address, String phoneNumber) {
Document document = new Document();
document.put("cid", cid);
document.put("name", name);
document.put("address", address);
document.put("phone_number", phoneNumber);
return document;
}

private String getCollectionNameRegex(String database, String[] captureCustomerCollections) {
checkState(captureCustomerCollections.length > 0);
if (captureCustomerCollections.length == 1) {
return database + "." + captureCustomerCollections[0];
} else {
// pattern that matches multiple collections
return Arrays.stream(captureCustomerCollections)
.map(coll -> "^(" + database + "." + coll + ")$")
.collect(Collectors.joining("|"));
}
}
}