Skip to content

Commit

Permalink
[FLINK-35298][cdc][metrics] Improve the fetch delay metrics
Browse files Browse the repository at this point in the history
This closes apache#3298.
  • Loading branch information
Shawn-Hx authored May 24, 2024
1 parent 644b5c2 commit 8e8fd30
Show file tree
Hide file tree
Showing 6 changed files with 166 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,15 @@
/** A collection class for handling metrics in {@link IncrementalSourceReader}. */
public class SourceReaderMetrics {

public static final long UNDEFINED = -1;

private final SourceReaderMetricGroup metricGroup;

/**
* currentFetchEventTimeLag = FetchTime - messageTimestamp, where the FetchTime is the time the
* record fetched into the source operator.
*/
private volatile long fetchDelay = 0L;
private volatile long fetchDelay = UNDEFINED;

/** The total number of record that failed to consume, process or emit. */
private final Counter numRecordsInErrorsCounter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ public Offset getOffsetPosition(Map<String, ?> offset) {

protected void emitElement(SourceRecord element, SourceOutput<T> output) throws Exception {
outputCollector.output = output;
outputCollector.currentMessageTimestamp = getMessageTimestamp(element);
debeziumDeserializationSchema.deserialize(element, outputCollector);
}

Expand All @@ -170,10 +171,21 @@ protected void reportMetrics(SourceRecord element) {

private static class OutputCollector<T> implements Collector<T> {
private SourceOutput<T> output;
private Long currentMessageTimestamp;

@Override
public void collect(T record) {
output.collect(record);
if (currentMessageTimestamp != null && currentMessageTimestamp > 0) {
// Only binlog event contains a valid timestamp. We use the output with timestamp to
// report the event time and let the source operator to report
// "currentEmitEventTimeLag" correctly.
output.collect(record, currentMessageTimestamp);
} else {
// Records in snapshot mode have a zero timestamp in the message. We use the output
// without timestamp to collect the record. Metric "currentEmitEventTimeLag" will
// not be updated in the source operator in this case.
output.collect(record);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@
/** A collection class for handling metrics in {@link MySqlSourceReader}. */
public class MySqlSourceReaderMetrics {

public static final long UNDEFINED = -1;

private final MetricGroup metricGroup;

/**
* currentFetchEventTimeLag = FetchTime - messageTimestamp, where the FetchTime is the time the
* record fetched into the source operator.
*/
private volatile long fetchDelay = 0L;
private volatile long fetchDelay = UNDEFINED;

public MySqlSourceReaderMetrics(MetricGroup metricGroup) {
this.metricGroup = metricGroup;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ private void updateStartingOffsetForSplit(MySqlSplitState splitState, SourceReco

private void emitElement(SourceRecord element, SourceOutput<T> output) throws Exception {
outputCollector.output = output;
outputCollector.currentMessageTimestamp = RecordUtils.getMessageTimestamp(element);
debeziumDeserializationSchema.deserialize(element, outputCollector);
}

Expand All @@ -135,10 +136,21 @@ private void reportMetrics(SourceRecord element) {

private static class OutputCollector<T> implements Collector<T> {
private SourceOutput<T> output;
private Long currentMessageTimestamp;

@Override
public void collect(T record) {
output.collect(record);
if (currentMessageTimestamp != null && currentMessageTimestamp > 0) {
// Only binlog event contains a valid timestamp. We use the output with timestamp to
// report the event time and let the source operator to report
// "currentEmitEventTimeLag" correctly.
output.collect(record, currentMessageTimestamp);
} else {
// Records in snapshot mode have a zero timestamp in the message. We use the output
// without timestamp to collect the record. Metric "currentEmitEventTimeLag" will
// not be updated in the source operator in this case.
output.collect(record);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils;
import org.apache.flink.cdc.connectors.mysql.source.metrics.MySqlSourceReaderMetrics;
import org.apache.flink.cdc.connectors.mysql.source.utils.hooks.SnapshotPhaseHook;
import org.apache.flink.cdc.connectors.mysql.source.utils.hooks.SnapshotPhaseHooks;
import org.apache.flink.cdc.connectors.mysql.table.MySqlDeserializationConverterFactory;
Expand All @@ -31,9 +32,16 @@
import org.apache.flink.cdc.connectors.mysql.testutils.TestTableSchemas;
import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.flink.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.cdc.debezium.table.MetadataConverter;
import org.apache.flink.cdc.debezium.table.RowDataDebeziumDeserializeSchema;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.runtime.metrics.groups.InternalSourceReaderMetricGroup;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.collect.CollectResultIterator;
Expand Down Expand Up @@ -95,13 +103,16 @@
import static java.lang.String.format;
import static org.apache.flink.api.common.JobStatus.RUNNING;
import static org.apache.flink.util.Preconditions.checkState;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

/** IT tests for {@link MySqlSource}. */
@RunWith(Parameterized.class)
public class MySqlSourceITCase extends MySqlSourceTestBase {

@Rule public final Timeout timeoutPerTest = Timeout.seconds(300);
public static final Duration TIMEOUT = Duration.ofSeconds(300);

@Rule public final Timeout timeoutPerTest = Timeout.seconds(TIMEOUT.getSeconds());

private static final String DEFAULT_SCAN_STARTUP_MODE = "initial";
private final UniqueDatabase customDatabase =
Expand Down Expand Up @@ -686,6 +697,123 @@ private void testStartingOffset(
}
}

@SuppressWarnings("unchecked")
@Test
public void testSourceMetrics() throws Exception {
customDatabase.createAndInitialize();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
MySqlSource<String> source =
MySqlSource.<String>builder()
.hostname(MYSQL_CONTAINER.getHost())
.port(MYSQL_CONTAINER.getDatabasePort())
.databaseList(customDatabase.getDatabaseName())
.tableList(customDatabase.getDatabaseName() + ".customers")
.username(customDatabase.getUsername())
.password(customDatabase.getPassword())
.deserializer(new StringDebeziumDeserializationSchema())
.serverId(getServerId())
.build();
DataStreamSource<String> stream =
env.fromSource(source, WatermarkStrategy.noWatermarks(), "MySQL CDC Source");
CollectResultIterator<String> iterator = addCollector(env, stream);
JobClient jobClient = env.executeAsync();
iterator.setJobClient(jobClient);

// ---------------------------- Snapshot phase ------------------------------
// Wait until we receive all 21 snapshot records
int numSnapshotRecordsExpected = 21;
int numSnapshotRecordsReceived = 0;
while (numSnapshotRecordsReceived < numSnapshotRecordsExpected && iterator.hasNext()) {
iterator.next();
numSnapshotRecordsReceived++;
}

// Check metrics
List<OperatorMetricGroup> metricGroups =
metricReporter.findOperatorMetricGroups(jobClient.getJobID(), "MySQL CDC Source");
// There should be only 1 parallelism of source, so it's safe to get the only group
OperatorMetricGroup group = metricGroups.get(0);
Map<String, Metric> metrics = metricReporter.getMetricsByGroup(group);

// numRecordsOut
assertEquals(
numSnapshotRecordsExpected,
group.getIOMetricGroup().getNumRecordsOutCounter().getCount());

// currentEmitEventTimeLag should be UNDEFINED during snapshot phase
assertTrue(metrics.containsKey(MetricNames.CURRENT_EMIT_EVENT_TIME_LAG));
Gauge<Long> currentEmitEventTimeLag =
(Gauge<Long>) metrics.get(MetricNames.CURRENT_EMIT_EVENT_TIME_LAG);
assertEquals(
InternalSourceReaderMetricGroup.UNDEFINED,
(long) currentEmitEventTimeLag.getValue());

// currentFetchEventTimeLag should be UNDEFINED during snapshot phase
assertTrue(metrics.containsKey(MetricNames.CURRENT_FETCH_EVENT_TIME_LAG));
Gauge<Long> currentFetchEventTimeLag =
(Gauge<Long>) metrics.get(MetricNames.CURRENT_FETCH_EVENT_TIME_LAG);
assertEquals(
MySqlSourceReaderMetrics.UNDEFINED, (long) currentFetchEventTimeLag.getValue());

// sourceIdleTime should be positive (we can't know the exact value)
assertTrue(metrics.containsKey(MetricNames.SOURCE_IDLE_TIME));
Gauge<Long> sourceIdleTime = (Gauge<Long>) metrics.get(MetricNames.SOURCE_IDLE_TIME);
assertTrue(sourceIdleTime.getValue() > 0);
assertTrue(sourceIdleTime.getValue() < TIMEOUT.toMillis());

// --------------------------------- Binlog phase -----------------------------
makeFirstPartBinlogEvents(getConnection(), customDatabase.qualifiedTableName("customers"));
// Wait until we receive 4 changes made above
int numBinlogRecordsExpected = 4;
int numBinlogRecordsReceived = 0;
while (numBinlogRecordsReceived < numBinlogRecordsExpected && iterator.hasNext()) {
iterator.next();
numBinlogRecordsReceived++;
}

// Check metrics
// numRecordsOut
assertEquals(
numSnapshotRecordsExpected + numBinlogRecordsExpected,
group.getIOMetricGroup().getNumRecordsOutCounter().getCount());

// currentEmitEventTimeLag should be reasonably positive (we can't know the exact value)
assertTrue(currentEmitEventTimeLag.getValue() > 0);
assertTrue(currentEmitEventTimeLag.getValue() < TIMEOUT.toMillis());

// currentEmitEventTimeLag should be reasonably positive (we can't know the exact value)
assertTrue(currentFetchEventTimeLag.getValue() > 0);
assertTrue(currentFetchEventTimeLag.getValue() < TIMEOUT.toMillis());

// currentEmitEventTimeLag should be reasonably positive (we can't know the exact value)
assertTrue(sourceIdleTime.getValue() > 0);
assertTrue(sourceIdleTime.getValue() < TIMEOUT.toMillis());

jobClient.cancel().get();
iterator.close();
}

private <T> CollectResultIterator<T> addCollector(
StreamExecutionEnvironment env, DataStream<T> stream) {
TypeSerializer<T> serializer =
stream.getTransformation().getOutputType().createSerializer(env.getConfig());
String accumulatorName = "dataStreamCollect_" + UUID.randomUUID();
CollectSinkOperatorFactory<T> factory =
new CollectSinkOperatorFactory<>(serializer, accumulatorName);
CollectSinkOperator<T> operator = (CollectSinkOperator<T>) factory.getOperator();
CollectResultIterator<T> iterator =
new CollectResultIterator<>(
operator.getOperatorIdFuture(),
serializer,
accumulatorName,
env.getCheckpointConfig());
CollectStreamSink<T> sink = new CollectStreamSink<>(stream, factory);
sink.name("Data stream collect sink");
env.addOperator(sink.getTransformation());
return iterator;
}

private MySqlSource<RowData> buildSleepingSource() {
ResolvedSchema physicalSchema =
new ResolvedSchema(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
import org.apache.flink.api.common.JobID;
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
import org.apache.flink.runtime.testutils.InMemoryReporter;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.test.util.MiniClusterWithClientResource;
Expand Down Expand Up @@ -52,6 +54,7 @@ public abstract class MySqlSourceTestBase extends TestLogger {

protected static final int DEFAULT_PARALLELISM = 4;
protected static final MySqlContainer MYSQL_CONTAINER = createMySqlContainer(MySqlVersion.V5_7);
protected InMemoryReporter metricReporter = InMemoryReporter.createWithRetainedMetrics();

@Rule
public final MiniClusterWithClientResource miniClusterResource =
Expand All @@ -61,6 +64,8 @@ public abstract class MySqlSourceTestBase extends TestLogger {
.setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM)
.setRpcServiceSharing(RpcServiceSharing.DEDICATED)
.withHaLeadershipControl()
.setConfiguration(
metricReporter.addToConfiguration(new Configuration()))
.build());

@BeforeClass
Expand Down

0 comments on commit 8e8fd30

Please sign in to comment.