diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/internal/DebeziumChangeFetcher.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/internal/DebeziumChangeFetcher.java index 96d6a67ff87..d162372353a 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/internal/DebeziumChangeFetcher.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/internal/DebeziumChangeFetcher.java @@ -94,13 +94,13 @@ public class DebeziumChangeFetcher { * currentFetchEventTimeLag = FetchTime - messageTimestamp, where the FetchTime is the time the * record fetched into the source operator. */ - private volatile long fetchDelay = 0L; + private volatile long fetchDelay = -1L; /** * emitDelay = EmitTime - messageTimestamp, where the EmitTime is the time the record leaves the * source operator. */ - private volatile long emitDelay = 0L; + private volatile long emitDelay = -1L; /** The number of records that failed to parse or deserialize. */ private volatile AtomicLong numRecordInErrors = new AtomicLong(0); @@ -230,7 +230,10 @@ private void handleBatch(List> changeEve for (ChangeEvent event : changeEvents) { SourceRecord record = event.value(); updateMessageTimestamp(record); - fetchDelay = isInDbSnapshotPhase ? 0L : processTime - messageTimestamp; + fetchDelay = + (isInDbSnapshotPhase || messageTimestamp == 0L) + ? -1L + : processTime - messageTimestamp; if (isHeartbeatEvent(record)) { // keep offset update @@ -269,7 +272,9 @@ private void emitRecordsUnderCheckpointLock( T record; while ((record = records.poll()) != null) { emitDelay = - isInDbSnapshotPhase ? 0L : System.currentTimeMillis() - messageTimestamp; + (isInDbSnapshotPhase || messageTimestamp == 0L) + ? -1L + : System.currentTimeMillis() - messageTimestamp; sourceContext.collect(record); } // update offset to state diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBMetricCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBMetricCase.java new file mode 100644 index 00000000000..ecb89405c0d --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBMetricCase.java @@ -0,0 +1,268 @@ +package org.apache.flink.cdc.connectors.mongodb.source; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.cdc.connectors.mongodb.MongoDBSource; +import org.apache.flink.cdc.connectors.mongodb.utils.MongoDBContainer; +import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Metric; +import org.apache.flink.metrics.groups.OperatorMetricGroup; +import org.apache.flink.runtime.metrics.MetricNames; +import org.apache.flink.runtime.metrics.groups.InternalSourceReaderMetricGroup; +import org.apache.flink.runtime.minicluster.RpcServiceSharing; +import org.apache.flink.runtime.testutils.InMemoryReporter; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.collect.CollectResultIterator; +import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator; +import org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory; +import org.apache.flink.streaming.api.operators.collect.CollectStreamSink; +import org.apache.flink.test.util.MiniClusterWithClientResource; + +import com.mongodb.ConnectionString; +import com.mongodb.MongoClientSettings; +import com.mongodb.client.MongoClient; +import com.mongodb.client.MongoClients; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoDatabase; +import com.mongodb.client.model.Filters; +import com.mongodb.client.model.Updates; +import org.bson.Document; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; + +import java.time.Duration; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.flink.cdc.connectors.mongodb.utils.MongoDBContainer.FLINK_USER; +import static org.apache.flink.cdc.connectors.mongodb.utils.MongoDBContainer.FLINK_USER_PASSWORD; +import static org.apache.flink.util.Preconditions.checkState; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** IT tests for {@link MongoDBSource}. */ +@RunWith(Parameterized.class) +public class MongoDBMetricCase { + protected InMemoryReporter metricReporter = InMemoryReporter.createWithRetainedMetrics(); + private static final Logger LOG = LoggerFactory.getLogger(MongoDBMetricCase.class); + + public MongoDBMetricCase(String mongoVersion) { + this.mongoContainer = + new MongoDBContainer("mongo:" + mongoVersion) + .withSharding() + .withLogConsumer(new Slf4jLogConsumer(LOG)); + } + + public static String[] getMongoVersions() { + String specifiedMongoVersion = System.getProperty("specifiedMongoVersion"); + if (specifiedMongoVersion != null) { + return new String[] {specifiedMongoVersion}; + } else { + return new String[] {"6.0.16", "7.0.12"}; + } + } + + protected static final int DEFAULT_PARALLELISM = 4; + + @Rule public final MongoDBContainer mongoContainer; + + protected MongoClient mongodbClient; + + @Rule + public final MiniClusterWithClientResource miniClusterResource = + new MiniClusterWithClientResource( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(1) + .setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM) + .setRpcServiceSharing(RpcServiceSharing.DEDICATED) + .withHaLeadershipControl() + .setConfiguration( + metricReporter.addToConfiguration(new Configuration())) + .build()); + + @Before + public void startContainers() { + LOG.info("Starting containers..."); + Startables.deepStart(Stream.of(mongoContainer)).join(); + + MongoClientSettings settings = + MongoClientSettings.builder() + .applyConnectionString( + new ConnectionString(mongoContainer.getConnectionString())) + .build(); + mongodbClient = MongoClients.create(settings); + + LOG.info("Containers are started."); + } + + public static final Duration TIMEOUT = Duration.ofSeconds(300); + + @Parameterized.Parameters(name = "mongoVersion: {0}") + public static Object[] parameters() { + return Stream.of(getMongoVersions()).map(e -> new Object[] {e}).toArray(); + } + + @Test + public void testSourceMetrics() throws Exception { + String customerDatabase = mongoContainer.executeCommandFileInSeparateDatabase("customer"); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + env.enableCheckpointing(200L); + SourceFunction sourceFunction = + MongoDBSource.builder() + .hosts(mongoContainer.getHostAndPort()) + .username(FLINK_USER) + .password(FLINK_USER_PASSWORD) + .databaseList(customerDatabase) + .collectionList( + getCollectionNameRegex( + customerDatabase, new String[] {"customers"})) + .deserializer(new JsonDebeziumDeserializationSchema()) + .build(); + DataStreamSource stream = env.addSource(sourceFunction, "MongoDB CDC Source"); + CollectResultIterator iterator = addCollector(env, stream); + JobClient jobClient = env.executeAsync(); + iterator.setJobClient(jobClient); + + // // ---------------------------- Snapshot phase ------------------------------ + // // Wait until we receive all 21 snapshot records + int numSnapshotRecordsExpected = 21; + int numSnapshotRecordsReceived = 0; + + while (numSnapshotRecordsReceived < numSnapshotRecordsExpected && iterator.hasNext()) { + iterator.next(); + numSnapshotRecordsReceived++; + } + + // Check metrics + List metricGroups = + metricReporter.findOperatorMetricGroups(jobClient.getJobID(), "MongoDB CDC Source"); + + // There should be only 1 parallelism of source, so it's safe to get the only group + OperatorMetricGroup group = metricGroups.get(0); + Map metrics = metricReporter.getMetricsByGroup(group); + + // numRecordsOut + assertEquals( + numSnapshotRecordsExpected, + group.getIOMetricGroup().getNumRecordsOutCounter().getCount()); + + // currentEmitEventTimeLag should be UNDEFINED during snapshot phase + assertTrue(metrics.containsKey(MetricNames.CURRENT_EMIT_EVENT_TIME_LAG)); + Gauge currentEmitEventTimeLag = + (Gauge) metrics.get(MetricNames.CURRENT_EMIT_EVENT_TIME_LAG); + assertEquals( + InternalSourceReaderMetricGroup.UNDEFINED, + (long) currentEmitEventTimeLag.getValue()); + // currentFetchEventTimeLag should be UNDEFINED during snapshot phase + assertTrue(metrics.containsKey(MetricNames.CURRENT_FETCH_EVENT_TIME_LAG)); + Gauge currentFetchEventTimeLag = + (Gauge) metrics.get(MetricNames.CURRENT_FETCH_EVENT_TIME_LAG); + assertEquals( + InternalSourceReaderMetricGroup.UNDEFINED, + (long) currentFetchEventTimeLag.getValue()); + // sourceIdleTime should be positive (we can't know the exact value) + assertTrue(metrics.containsKey(MetricNames.SOURCE_IDLE_TIME)); + Gauge sourceIdleTime = (Gauge) metrics.get(MetricNames.SOURCE_IDLE_TIME); + assertTrue(sourceIdleTime.getValue() > 0); + assertTrue(sourceIdleTime.getValue() < TIMEOUT.toMillis()); + + // --------------------------------- Binlog phase ----------------------------- + makeFirstPartChangeStreamEvents(mongodbClient.getDatabase(customerDatabase), "customers"); + // Wait until we receive 4 changes made above + int numBinlogRecordsExpected = 4; + int numBinlogRecordsReceived = 0; + while (numBinlogRecordsReceived < numBinlogRecordsExpected && iterator.hasNext()) { + iterator.next(); + numBinlogRecordsReceived++; + } + + // Check metrics + // numRecordsOut + assertEquals( + numSnapshotRecordsExpected + numBinlogRecordsExpected, + group.getIOMetricGroup().getNumRecordsOutCounter().getCount()); + + // currentEmitEventTimeLag should be reasonably positive (we can't know the exact value) + assertTrue(currentEmitEventTimeLag.getValue() > 0); + assertTrue(currentEmitEventTimeLag.getValue() < TIMEOUT.toMillis()); + + // currentEmitEventTimeLag should be reasonably positive (we can't know the exact value) + assertTrue(currentFetchEventTimeLag.getValue() > 0); + assertTrue(currentFetchEventTimeLag.getValue() < TIMEOUT.toMillis()); + + // currentEmitEventTimeLag should be reasonably positive (we can't know the exact value) + assertTrue(sourceIdleTime.getValue() > 0); + assertTrue(sourceIdleTime.getValue() < TIMEOUT.toMillis()); + + jobClient.cancel().get(); + iterator.close(); + } + + private CollectResultIterator addCollector( + StreamExecutionEnvironment env, DataStream stream) { + TypeSerializer serializer = + stream.getTransformation().getOutputType().createSerializer(env.getConfig()); // + String accumulatorName = "dataStreamCollect_" + UUID.randomUUID(); + CollectSinkOperatorFactory factory = + new CollectSinkOperatorFactory<>(serializer, accumulatorName); + CollectSinkOperator operator = (CollectSinkOperator) factory.getOperator(); + CollectResultIterator iterator = + new CollectResultIterator<>( + operator.getOperatorIdFuture(), + serializer, + accumulatorName, + env.getCheckpointConfig(), + 10000L); + CollectStreamSink sink = new CollectStreamSink<>(stream, factory); + sink.name("Data stream collect sink"); + env.addOperator(sink.getTransformation()); + return iterator; + } + + private void makeFirstPartChangeStreamEvents(MongoDatabase mongoDatabase, String collection) { + MongoCollection mongoCollection = mongoDatabase.getCollection(collection); + mongoCollection.updateOne(Filters.eq("cid", 101L), Updates.set("address", "Hangzhou")); + mongoCollection.deleteOne(Filters.eq("cid", 102L)); + mongoCollection.insertOne(customerDocOf(102L, "user_2", "Shanghai", "123567891234")); + mongoCollection.updateOne(Filters.eq("cid", 103L), Updates.set("address", "Hangzhou")); + } + + private Document customerDocOf(Long cid, String name, String address, String phoneNumber) { + Document document = new Document(); + document.put("cid", cid); + document.put("name", name); + document.put("address", address); + document.put("phone_number", phoneNumber); + return document; + } + + private String getCollectionNameRegex(String database, String[] captureCustomerCollections) { + checkState(captureCustomerCollections.length > 0); + if (captureCustomerCollections.length == 1) { + return database + "." + captureCustomerCollections[0]; + } else { + // pattern that matches multiple collections + return Arrays.stream(captureCustomerCollections) + .map(coll -> "^(" + database + "." + coll + ")$") + .collect(Collectors.joining("|")); + } + } +}