Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

prometheus metric_version = 2 & make scrape url as a configurable tag #5767

Merged
merged 9 commits into from
Nov 21, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions plugins/inputs/prometheus/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ in Prometheus format.
## An array of urls to scrape metrics from.
urls = ["http://localhost:9100/metrics"]

## Metric version (optional, default=1, supported values are 1 and 2)
# metric_version = 2

## An array of Kubernetes services to scrape metrics from.
# kubernetes_services = ["http://my-service-dns.my-namespace:9100/metrics"]

Expand Down Expand Up @@ -135,3 +138,18 @@ cpu_usage_user,cpu=cpu1,url=http://example.org:9273/metrics gauge=5.829145728641
cpu_usage_user,cpu=cpu2,url=http://example.org:9273/metrics gauge=2.119071644805144 1505776751000000000
cpu_usage_user,cpu=cpu3,url=http://example.org:9273/metrics gauge=1.5228426395944945 1505776751000000000
```

**Output (when metric_version = 2)**
```
prometheus,quantile=1,url=http://example.org:9273/metrics go_gc_duration_seconds=0.005574303 1556075100000000000
prometheus,quantile=0.75,url=http://example.org:9273/metrics go_gc_duration_seconds=0.0001046 1556075100000000000
prometheus,quantile=0.5,url=http://example.org:9273/metrics go_gc_duration_seconds=0.0000719 1556075100000000000
prometheus,quantile=0.25,url=http://example.org:9273/metrics go_gc_duration_seconds=0.0000579 1556075100000000000
prometheus,quantile=0,url=http://example.org:9273/metrics go_gc_duration_seconds=0.0000349 1556075100000000000
prometheus,url=http://example.org:9273/metrics go_gc_duration_seconds_count=324,go_gc_duration_seconds_sum=0.091340353 1556075100000000000
prometheus,url=http://example.org:9273/metrics go_goroutines=15 1556075100000000000
prometheus,cpu=cpu0,url=http://example.org:9273/metrics cpu_usage_user=1.513622603430151 1505776751000000000
prometheus,cpu=cpu1,url=http://example.org:9273/metrics cpu_usage_user=5.829145728641773 1505776751000000000
prometheus,cpu=cpu2,url=http://example.org:9273/metrics cpu_usage_user=2.119071644805144 1505776751000000000
prometheus,cpu=cpu3,url=http://example.org:9273/metrics cpu_usage_user=1.5228426395944945 1505776751000000000
```
158 changes: 158 additions & 0 deletions plugins/inputs/prometheus/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,145 @@ import (
"github.com/prometheus/common/expfmt"
)

// Parse returns a slice of Metrics from a text representation of a
// metrics
func ParseV2(buf []byte, header http.Header) ([]telegraf.Metric, error) {
var metrics []telegraf.Metric
var parser expfmt.TextParser
// parse even if the buffer begins with a newline
buf = bytes.TrimPrefix(buf, []byte("\n"))
// Read raw data
buffer := bytes.NewBuffer(buf)
reader := bufio.NewReader(buffer)

mediatype, params, err := mime.ParseMediaType(header.Get("Content-Type"))
// Prepare output
metricFamilies := make(map[string]*dto.MetricFamily)

if err == nil && mediatype == "application/vnd.google.protobuf" &&
params["encoding"] == "delimited" &&
params["proto"] == "io.prometheus.client.MetricFamily" {
for {
mf := &dto.MetricFamily{}
if _, ierr := pbutil.ReadDelimited(reader, mf); ierr != nil {
if ierr == io.EOF {
break
}
return nil, fmt.Errorf("reading metric family protocol buffer failed: %s", ierr)
}
metricFamilies[mf.GetName()] = mf
}
} else {
metricFamilies, err = parser.TextToMetricFamilies(reader)
if err != nil {
return nil, fmt.Errorf("reading text format failed: %s", err)
}
}

// read metrics
for metricName, mf := range metricFamilies {
for _, m := range mf.Metric {
// reading tags
tags := makeLabels(m)

if mf.GetType() == dto.MetricType_SUMMARY {
// summary metric
telegrafMetrics := makeQuantilesV2(m, tags, metricName, mf.GetType())
metrics = append(metrics, telegrafMetrics...)
} else if mf.GetType() == dto.MetricType_HISTOGRAM {
// histogram metric
telegrafMetrics := makeBucketsV2(m, tags, metricName, mf.GetType())
metrics = append(metrics, telegrafMetrics...)
} else {
// standard metric
// reading fields
fields := make(map[string]interface{})
fields = getNameAndValueV2(m, metricName)
// converting to telegraf metric
if len(fields) > 0 {
var t time.Time
if m.TimestampMs != nil && *m.TimestampMs > 0 {
t = time.Unix(0, *m.TimestampMs*1000000)
} else {
t = time.Now()
}
metric, err := metric.New("prometheus", tags, fields, t, valueType(mf.GetType()))
if err == nil {
metrics = append(metrics, metric)
}
}
}
}
}

return metrics, err
}

// Get Quantiles for summary metric & Buckets for histogram
func makeQuantilesV2(m *dto.Metric, tags map[string]string, metricName string, metricType dto.MetricType) []telegraf.Metric {
var metrics []telegraf.Metric
fields := make(map[string]interface{})
var t time.Time
if m.TimestampMs != nil && *m.TimestampMs > 0 {
t = time.Unix(0, *m.TimestampMs*1000000)
} else {
t = time.Now()
}
fields[metricName+"_count"] = float64(m.GetSummary().GetSampleCount())
fields[metricName+"_sum"] = float64(m.GetSummary().GetSampleSum())
met, err := metric.New("prometheus", tags, fields, t, valueType(metricType))
if err == nil {
metrics = append(metrics, met)
}

for _, q := range m.GetSummary().Quantile {
newTags := tags
fields = make(map[string]interface{})
if !math.IsNaN(q.GetValue()) {
newTags["quantile"] = fmt.Sprint(q.GetQuantile())
fields[metricName] = float64(q.GetValue())

quantileMetric, err := metric.New("prometheus", newTags, fields, t, valueType(metricType))
if err == nil {
metrics = append(metrics, quantileMetric)
}
}
}
return metrics
}

// Get Buckets from histogram metric
func makeBucketsV2(m *dto.Metric, tags map[string]string, metricName string, metricType dto.MetricType) []telegraf.Metric {
var metrics []telegraf.Metric
fields := make(map[string]interface{})
var t time.Time
if m.TimestampMs != nil && *m.TimestampMs > 0 {
t = time.Unix(0, *m.TimestampMs*1000000)
} else {
t = time.Now()
}
fields[metricName+"_count"] = float64(m.GetHistogram().GetSampleCount())
fields[metricName+"_sum"] = float64(m.GetHistogram().GetSampleSum())

met, err := metric.New("prometheus", tags, fields, t, valueType(metricType))
if err == nil {
metrics = append(metrics, met)
}

for _, b := range m.GetHistogram().Bucket {
newTags := tags
fields = make(map[string]interface{})
newTags["le"] = fmt.Sprint(b.GetUpperBound())
fields[metricName] = float64(b.GetCumulativeCount())
vishiy marked this conversation as resolved.
Show resolved Hide resolved

histogramMetric, err := metric.New("prometheus", newTags, fields, t, valueType(metricType))
if err == nil {
metrics = append(metrics, histogramMetric)
}
}
return metrics
}

// Parse returns a slice of Metrics from a text representation of a
// metrics
func Parse(buf []byte, header http.Header) ([]telegraf.Metric, error) {
Expand Down Expand Up @@ -159,3 +298,22 @@ func getNameAndValue(m *dto.Metric) map[string]interface{} {
}
return fields
}

// Get name and value from metric
func getNameAndValueV2(m *dto.Metric, metricName string) map[string]interface{} {
fields := make(map[string]interface{})
if m.Gauge != nil {
if !math.IsNaN(m.GetGauge().GetValue()) {
fields[metricName] = float64(m.GetGauge().GetValue())
}
} else if m.Counter != nil {
if !math.IsNaN(m.GetCounter().GetValue()) {
fields[metricName] = float64(m.GetCounter().GetValue())
}
} else if m.Untyped != nil {
if !math.IsNaN(m.GetUntyped().GetValue()) {
fields[metricName] = float64(m.GetUntyped().GetValue())
}
}
return fields
}
27 changes: 25 additions & 2 deletions plugins/inputs/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ type Prometheus struct {

ResponseTimeout internal.Duration `toml:"response_timeout"`

MetricVersion int `toml:"metric_version"`

UrlTag string `toml:"url_tag"`
vishiy marked this conversation as resolved.
Show resolved Hide resolved

tls.ClientConfig

client *http.Client
Expand All @@ -53,6 +57,12 @@ var sampleConfig = `
## An array of urls to scrape metrics from.
urls = ["http://localhost:9100/metrics"]

## Metric version (optional, default=1, supported values are 1 and 2)
# metric_version = 2

## Url tag name (tag containing scrapped url. optional, default is "url")
# url_tag = "scrapeUrl"

## An array of Kubernetes services to scrape metrics from.
# kubernetes_services = ["http://my-service-dns.my-namespace:9100/metrics"]

Expand Down Expand Up @@ -214,6 +224,7 @@ func (p *Prometheus) gatherURL(u URLAndAddress, acc telegraf.Accumulator) error
var req *http.Request
var err error
var uClient *http.Client
var metrics []telegraf.Metric
if u.URL.Scheme == "unix" {
path := u.URL.Query().Get("path")
if path == "" {
Expand Down Expand Up @@ -273,17 +284,29 @@ func (p *Prometheus) gatherURL(u URLAndAddress, acc telegraf.Accumulator) error
return fmt.Errorf("error reading body: %s", err)
}

metrics, err := Parse(body, resp.Header)
if p.MetricVersion == 2 {
metrics, err = ParseV2(body, resp.Header)
log.Printf("prometheus: using metric_version = %v", p.MetricVersion)
vishiy marked this conversation as resolved.
Show resolved Hide resolved
} else {
metrics, err = Parse(body, resp.Header)
}

if err != nil {
return fmt.Errorf("error reading metrics for %s: %s",
u.URL, err)
}

var urltag = "url"
vishiy marked this conversation as resolved.
Show resolved Hide resolved
if p.UrlTag != "" {
urltag = p.UrlTag
log.Printf("prometheus: using url_tag = %v", urltag)
vishiy marked this conversation as resolved.
Show resolved Hide resolved
}

for _, metric := range metrics {
tags := metric.Tags()
// strip user and password from URL
u.OriginalURL.User = nil
tags["url"] = u.OriginalURL.String()
tags[urltag] = u.OriginalURL.String()
if u.Address != "" {
tags["address"] = u.Address
}
Expand Down
59 changes: 59 additions & 0 deletions plugins/inputs/prometheus/prometheus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,21 @@ go_goroutines 15
# TYPE test_metric untyped
test_metric{label="value"} 1.0 1490802350000
`
const sampleSummaryTextFormat = `# HELP go_gc_duration_seconds A summary of the GC invocation durations.
# TYPE go_gc_duration_seconds summary
go_gc_duration_seconds{quantile="0"} 0.00010425500000000001
go_gc_duration_seconds{quantile="0.25"} 0.000139108
go_gc_duration_seconds{quantile="0.5"} 0.00015749400000000002
go_gc_duration_seconds{quantile="0.75"} 0.000331463
go_gc_duration_seconds{quantile="1"} 0.000667154
go_gc_duration_seconds_sum 0.0018183950000000002
go_gc_duration_seconds_count 7
`
const sampleGaugeTextFormat = `
# HELP go_goroutines Number of goroutines that currently exist.
# TYPE go_goroutines gauge
go_goroutines 15 1490802350000
`

func TestPrometheusGeneratesMetrics(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -103,3 +118,47 @@ func TestPrometheusGeneratesMetricsAlthoughFirstDNSFails(t *testing.T) {
assert.True(t, acc.HasFloatField("test_metric", "value"))
assert.True(t, acc.HasTimestamp("test_metric", time.Unix(1490802350, 0)))
}

func TestPrometheusGeneratesSummaryMetricsV2(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, sampleSummaryTextFormat)
}))
defer ts.Close()

p := &Prometheus{
URLs: []string{ts.URL},
MetricVersion: 2,
}

var acc testutil.Accumulator

err := acc.GatherError(p.Gather)
require.NoError(t, err)

assert.True(t, acc.TagSetValue("prometheus", "quantile") == "0")
assert.True(t, acc.HasFloatField("prometheus", "go_gc_duration_seconds_sum"))
assert.True(t, acc.HasFloatField("prometheus", "go_gc_duration_seconds_count"))
assert.True(t, acc.TagValue("prometheus", "url") == ts.URL+"/metrics")

}

func TestPrometheusGeneratesGaugeMetricsV2(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, sampleGaugeTextFormat)
}))
defer ts.Close()

p := &Prometheus{
URLs: []string{ts.URL},
MetricVersion: 2,
}

var acc testutil.Accumulator

err := acc.GatherError(p.Gather)
require.NoError(t, err)

assert.True(t, acc.HasFloatField("prometheus", "go_goroutines"))
assert.True(t, acc.TagValue("prometheus", "url") == ts.URL+"/metrics")
assert.True(t, acc.HasTimestamp("prometheus", time.Unix(1490802350, 0)))
}
12 changes: 12 additions & 0 deletions testutil/accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,18 @@ func (a *Accumulator) HasTag(measurement string, key string) bool {
return false
}

func (a *Accumulator) TagSetValue(measurement string, key string) string {
for _, p := range a.Metrics {
if p.Measurement == measurement {
v, ok := p.Tags[key]
if ok {
return v
}
}
}
return ""
}

func (a *Accumulator) TagValue(measurement string, key string) string {
for _, p := range a.Metrics {
if p.Measurement == measurement {
Expand Down