Skip to content

Commit

Permalink
fix(issue2004): fix AutoBalancerMetricsReporter cannot process with t… (
Browse files Browse the repository at this point in the history
#2005)

fix(issue2004): fix AutoBalancerMetricsReporter cannot process with topic name contains period

Signed-off-by: Shichao Nie <niesc@automq.com>
  • Loading branch information
SCNieh authored Sep 10, 2024
1 parent 2b5e703 commit 5097a25
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.metrics.KafkaMetricsGroup;

/**
Expand Down Expand Up @@ -123,7 +124,7 @@ public static AutoBalancerMetrics toAutoBalancerMetric(long nowMs,
com.yammer.metrics.core.MetricName metricName,
double value,
String attribute) {
Map<String, String> tags = yammerMetricScopeToTags(metricName.getScope());
Map<String, String> tags = mBeanNameToTags(metricName.getMBeanName());
AutoBalancerMetrics ccm = tags == null ? null : toAutoBalancerMetric(nowMs, brokerId, brokerRack, metricName.getName(), tags, value, attribute);
if (ccm == null) {
throw new IllegalArgumentException(String.format("Cannot convert yammer metric %s to a Auto Balancer metric for "
Expand Down Expand Up @@ -230,6 +231,38 @@ public static Map<String, String> yammerMetricScopeToTags(String scope) {
}
}

/**
* Convert a mbean name to tags.
* This conversion is based on the assumption that the mbean name is in the format of "group:type=ClassName,name=MetricName,tag1=value1,tag2=value2,..."
* any string that follows the "name=MetricName" will be considered as tags. This is assured in {@code org/apache/kafka/server/metrics/KafkaMetricsGroup.java:53}
*
* @param mbeanName MBean name for the Yammer metric.
* @return Empty map for {@code null} mBeanName, {@code null} for mBeanName without a legal format, parsed tags otherwise.
*/
public static Map<String, String> mBeanNameToTags(String mbeanName) {
if (!Utils.isBlank(mbeanName)) {
String[] kv = mbeanName.split(",");
Map<String, String> tags = new HashMap<>();
boolean markTagStart = false;
for (String tag : kv) {
String[] pair = tag.split("=");
if (pair.length != 2) {
return null;
}
if (!markTagStart) {
if (pair[0].equals("name")) {
markTagStart = true;
}
continue;
}
tags.put(pair[0], pair[1]);
}
return tags;
} else {
return Collections.emptyMap();
}
}

/**
* Check if tags are valid for a topic-partition metric
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@

package kafka.autobalancer.metricsreporter.metric;

import com.yammer.metrics.core.MetricName;
import java.util.Collections;
import java.util.Map;
import kafka.autobalancer.common.types.RawMetricTypes;
import org.apache.kafka.server.metrics.KafkaMetricsGroup;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
Expand All @@ -35,4 +39,41 @@ public void testSanityCheckTopicPartitionMetricsCompleteness() {
metrics.put(RawMetricTypes.PARTITION_SIZE, 10);
Assertions.assertTrue(MetricsUtils.sanityCheckTopicPartitionMetricsCompleteness(metrics));
}

@Test
public void testToAutoBalancerMetric() {
KafkaMetricsGroup kafkaMetricsGroup = new KafkaMetricsGroup(MetricsUtilsTest.class);
long now = System.currentTimeMillis();
int brokerId = 0;
String rack = "rack-a";
MetricName metricName = kafkaMetricsGroup.metricName(MetricsUtils.BYTES_IN_PER_SEC, Map.of(
"topic", "topic.with.dot",
"partition", "0"
));
AutoBalancerMetrics metrics = MetricsUtils.toAutoBalancerMetric(now, brokerId, rack, metricName, 10.5);
Assertions.assertNotNull(metrics);
Assertions.assertInstanceOf(TopicPartitionMetrics.class, metrics);
Assertions.assertEquals(10.5, metrics.getMetricValueMap().get(RawMetricTypes.PARTITION_BYTES_IN));
TopicPartitionMetrics topicPartitionMetrics = (TopicPartitionMetrics) metrics;
Assertions.assertEquals("topic.with.dot", topicPartitionMetrics.topic());
Assertions.assertEquals(0, topicPartitionMetrics.partition());

metricName = kafkaMetricsGroup.metricName(MetricsUtils.BYTES_IN_PER_SEC, Map.of(
"topic", "topic_without_dot",
"partition", "1"
));
metrics = MetricsUtils.toAutoBalancerMetric(now, brokerId, rack, metricName, 10.5);
Assertions.assertNotNull(metrics);
Assertions.assertInstanceOf(TopicPartitionMetrics.class, metrics);
Assertions.assertEquals(10.5, metrics.getMetricValueMap().get(RawMetricTypes.PARTITION_BYTES_IN));
topicPartitionMetrics = (TopicPartitionMetrics) metrics;
Assertions.assertEquals("topic_without_dot", topicPartitionMetrics.topic());
Assertions.assertEquals(1, topicPartitionMetrics.partition());

MetricName metricNameWithEmptyTags = kafkaMetricsGroup.metricName(MetricsUtils.BYTES_IN_PER_SEC, Collections.emptyMap());
Assertions.assertThrows(IllegalArgumentException.class, () -> MetricsUtils.toAutoBalancerMetric(now, brokerId, rack, metricNameWithEmptyTags, 10.5));
Map<String, String> tags = MetricsUtils.mBeanNameToTags(metricNameWithEmptyTags.getMBeanName());
Assertions.assertNotNull(tags);
Assertions.assertTrue(tags.isEmpty());
}
}

0 comments on commit 5097a25

Please sign in to comment.