-
Notifications
You must be signed in to change notification settings - Fork 5.6k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Showing
6 changed files
with
361 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
## Amazon CloudWatch Output for Telegraf | ||
|
||
This plugin will send points to Amazon CloudWatch. | ||
|
||
## Amazon Authentication | ||
|
||
This plugin uses a credential chain for Authentication with the CloudWatch | ||
API endpoint. In the following order the plugin will attempt to authenticate. | ||
1. [IAMS Role](http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html) | ||
2. [Environment Variables](https://github.com/aws/aws-sdk-go/wiki/configuring-sdk) | ||
3. [Shared Credentials](https://github.com/aws/aws-sdk-go/wiki/configuring-sdk) | ||
|
||
## Config | ||
|
||
For this output plugin to function correctly the following variables | ||
must be configured. | ||
|
||
* region | ||
* namespace | ||
|
||
### region | ||
|
||
The region is the Amazon region that you wish to connect to. | ||
Examples include but are not limited to: | ||
* us-west-1 | ||
* us-west-2 | ||
* us-east-1 | ||
* ap-southeast-1 | ||
* ap-southeast-2 | ||
|
||
### namespace | ||
|
||
The namespace used for AWS CloudWatch metrics. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,236 @@ | ||
package cloudwatch | ||
|
||
import ( | ||
"log" | ||
"math" | ||
"sort" | ||
"strings" | ||
"time" | ||
|
||
"github.com/aws/aws-sdk-go/aws" | ||
"github.com/aws/aws-sdk-go/aws/credentials" | ||
"github.com/aws/aws-sdk-go/aws/credentials/ec2rolecreds" | ||
"github.com/aws/aws-sdk-go/aws/ec2metadata" | ||
"github.com/aws/aws-sdk-go/aws/session" | ||
"github.com/aws/aws-sdk-go/service/cloudwatch" | ||
|
||
"github.com/influxdata/influxdb/client/v2" | ||
"github.com/influxdata/telegraf/plugins/outputs" | ||
) | ||
|
||
type CloudWatch struct { | ||
Region string // AWS Region | ||
Namespace string // CloudWatch Metrics Namespace | ||
svc *cloudwatch.CloudWatch | ||
} | ||
|
||
var sampleConfig = ` | ||
# Amazon REGION | ||
region = 'us-east-1' | ||
# Namespace for the CloudWatch MetricDatums | ||
namespace = 'InfluxData/Telegraf' | ||
` | ||
|
||
func (c *CloudWatch) SampleConfig() string { | ||
return sampleConfig | ||
} | ||
|
||
func (c *CloudWatch) Description() string { | ||
return "Configuration for AWS CloudWatch output." | ||
} | ||
|
||
func (c *CloudWatch) Connect() error { | ||
Config := &aws.Config{ | ||
Region: aws.String(c.Region), | ||
Credentials: credentials.NewChainCredentials( | ||
[]credentials.Provider{ | ||
&ec2rolecreds.EC2RoleProvider{Client: ec2metadata.New(session.New())}, | ||
&credentials.EnvProvider{}, | ||
&credentials.SharedCredentialsProvider{}, | ||
}), | ||
} | ||
|
||
svc := cloudwatch.New(session.New(Config)) | ||
|
||
params := &cloudwatch.ListMetricsInput{ | ||
Namespace: aws.String(c.Namespace), | ||
} | ||
|
||
_, err := svc.ListMetrics(params) // Try a read-only call to test connection. | ||
|
||
if err != nil { | ||
log.Printf("cloudwatch: Error in ListMetrics API call : %+v \n", err.Error()) | ||
} | ||
|
||
c.svc = svc | ||
|
||
return err | ||
} | ||
|
||
func (c *CloudWatch) Close() error { | ||
return nil | ||
} | ||
|
||
func (c *CloudWatch) Write(points []*client.Point) error { | ||
for _, pt := range points { | ||
err := c.WriteSinglePoint(pt) | ||
if err != nil { | ||
return err | ||
} | ||
} | ||
|
||
return nil | ||
} | ||
|
||
// Write data for a single point. A point can have many fields and one field | ||
// is equal to one MetricDatum. There is a limit on how many MetricDatums a | ||
// request can have so we process one Point at a time. | ||
func (c *CloudWatch) WriteSinglePoint(point *client.Point) error { | ||
datums := BuildMetricDatum(point) | ||
|
||
const maxDatumsPerCall = 20 // PutMetricData only supports up to 20 data points per call | ||
|
||
for _, partition := range PartitionDatums(maxDatumsPerCall, datums) { | ||
err := c.WriteToCloudWatch(partition) | ||
|
||
if err != nil { | ||
return err | ||
} | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func (c *CloudWatch) WriteToCloudWatch(datums []*cloudwatch.MetricDatum) error { | ||
params := &cloudwatch.PutMetricDataInput{ | ||
MetricData: datums, | ||
Namespace: aws.String(c.Namespace), | ||
} | ||
|
||
_, err := c.svc.PutMetricData(params) | ||
|
||
if err != nil { | ||
log.Printf("CloudWatch: Unable to write to CloudWatch : %+v \n", err.Error()) | ||
} | ||
|
||
return err | ||
} | ||
|
||
// Partition the MetricDatums into smaller slices of a max size so that are under the limit | ||
// for the AWS API calls. | ||
func PartitionDatums(size int, datums []*cloudwatch.MetricDatum) [][]*cloudwatch.MetricDatum { | ||
|
||
numberOfPartitions := len(datums) / size | ||
if len(datums)%size != 0 { | ||
numberOfPartitions += 1 | ||
} | ||
|
||
partitions := make([][]*cloudwatch.MetricDatum, numberOfPartitions) | ||
|
||
for i := 0; i < numberOfPartitions; i++ { | ||
start := size * i | ||
end := size * (i + 1) | ||
if end > len(datums) { | ||
end = len(datums) | ||
} | ||
|
||
partitions[i] = datums[start:end] | ||
} | ||
|
||
return partitions | ||
} | ||
|
||
// Make a MetricDatum for each field in a Point. Only fields with values that can be | ||
// converted to float64 are supported. Non-supported fields are skipped. | ||
func BuildMetricDatum(point *client.Point) []*cloudwatch.MetricDatum { | ||
datums := make([]*cloudwatch.MetricDatum, len(point.Fields())) | ||
i := 0 | ||
|
||
var value float64 | ||
|
||
for k, v := range point.Fields() { | ||
switch t := v.(type) { | ||
case int: | ||
value = float64(t) | ||
case int32: | ||
value = float64(t) | ||
case int64: | ||
value = float64(t) | ||
case float64: | ||
value = t | ||
case bool: | ||
if t { | ||
value = 1 | ||
} else { | ||
value = 0 | ||
} | ||
case time.Time: | ||
value = float64(t.Unix()) | ||
default: | ||
// Skip unsupported type. | ||
datums = datums[:len(datums)-1] | ||
continue | ||
} | ||
|
||
datums[i] = &cloudwatch.MetricDatum{ | ||
MetricName: aws.String(strings.Join([]string{point.Name(), k}, "_")), | ||
Value: aws.Float64(value), | ||
Dimensions: BuildDimensions(point.Tags()), | ||
Timestamp: aws.Time(point.Time()), | ||
} | ||
|
||
i += 1 | ||
} | ||
|
||
return datums | ||
} | ||
|
||
// Make a list of Dimensions by using a Point's tags. CloudWatch supports up to | ||
// 10 dimensions per metric so we only keep up to the first 10 alphabetically. | ||
// This always includes the "host" tag if it exists. | ||
func BuildDimensions(ptTags map[string]string) []*cloudwatch.Dimension { | ||
|
||
const MaxDimensions = 10 | ||
dimensions := make([]*cloudwatch.Dimension, int(math.Min(float64(len(ptTags)), MaxDimensions))) | ||
|
||
i := 0 | ||
|
||
// This is pretty ugly but we always want to include the "host" tag if it exists. | ||
if host, ok := ptTags["host"]; ok { | ||
dimensions[i] = &cloudwatch.Dimension{ | ||
Name: aws.String("host"), | ||
Value: aws.String(host), | ||
} | ||
i += 1 | ||
} | ||
|
||
var keys []string | ||
for k := range ptTags { | ||
if k != "host" { | ||
keys = append(keys, k) | ||
} | ||
} | ||
sort.Strings(keys) | ||
|
||
for _, k := range keys { | ||
if i >= MaxDimensions { | ||
break | ||
} | ||
|
||
dimensions[i] = &cloudwatch.Dimension{ | ||
Name: aws.String(k), | ||
Value: aws.String(ptTags[k]), | ||
} | ||
|
||
i += 1 | ||
} | ||
|
||
return dimensions | ||
} | ||
|
||
func init() { | ||
outputs.Add("cloudwatch", func() outputs.Output { | ||
return &CloudWatch{} | ||
}) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,88 @@ | ||
package cloudwatch | ||
|
||
import ( | ||
"sort" | ||
"testing" | ||
|
||
"github.com/aws/aws-sdk-go/aws" | ||
"github.com/aws/aws-sdk-go/service/cloudwatch" | ||
|
||
"github.com/influxdata/influxdb/client/v2" | ||
"github.com/influxdata/telegraf/testutil" | ||
|
||
"github.com/stretchr/testify/assert" | ||
) | ||
|
||
// Test that each tag becomes one dimension | ||
func TestBuildDimensions(t *testing.T) { | ||
const MaxDimensions = 10 | ||
|
||
assert := assert.New(t) | ||
|
||
testPoint := testutil.TestPoint(1) | ||
dimensions := BuildDimensions(testPoint.Tags()) | ||
|
||
tagKeys := make([]string, len(testPoint.Tags())) | ||
i := 0 | ||
for k, _ := range testPoint.Tags() { | ||
tagKeys[i] = k | ||
i += 1 | ||
} | ||
|
||
sort.Strings(tagKeys) | ||
|
||
if len(testPoint.Tags()) >= MaxDimensions { | ||
assert.Equal(MaxDimensions, len(dimensions), "Number of dimensions should be less than MaxDimensions") | ||
} else { | ||
assert.Equal(len(testPoint.Tags()), len(dimensions), "Number of dimensions should be equal to number of tags") | ||
} | ||
|
||
for i, key := range tagKeys { | ||
if i >= 10 { | ||
break | ||
} | ||
assert.Equal(key, *dimensions[i].Name, "Key should be equal") | ||
assert.Equal(testPoint.Tags()[key], *dimensions[i].Value, "Value should be equal") | ||
} | ||
} | ||
|
||
// Test that points with valid values have a MetricDatum created where as non valid do not. | ||
// Skips "time.Time" type as something is converting the value to string. | ||
func TestBuildMetricDatums(t *testing.T) { | ||
assert := assert.New(t) | ||
|
||
validPoints := []*client.Point{ | ||
testutil.TestPoint(1), | ||
testutil.TestPoint(int32(1)), | ||
testutil.TestPoint(int64(1)), | ||
testutil.TestPoint(float64(1)), | ||
testutil.TestPoint(true), | ||
} | ||
|
||
for _, point := range validPoints { | ||
datums := BuildMetricDatum(point) | ||
assert.Equal(1, len(datums), "Valid type should create a Datum") | ||
} | ||
|
||
nonValidPoint := testutil.TestPoint("Foo") | ||
|
||
assert.Equal(0, len(BuildMetricDatum(nonValidPoint)), "Invalid type should not create a Datum") | ||
} | ||
|
||
func TestPartitionDatums(t *testing.T) { | ||
|
||
assert := assert.New(t) | ||
|
||
testDatum := cloudwatch.MetricDatum{ | ||
MetricName: aws.String("Foo"), | ||
Value: aws.Float64(1), | ||
} | ||
|
||
oneDatum := []*cloudwatch.MetricDatum{&testDatum} | ||
twoDatum := []*cloudwatch.MetricDatum{&testDatum, &testDatum} | ||
threeDatum := []*cloudwatch.MetricDatum{&testDatum, &testDatum, &testDatum} | ||
|
||
assert.Equal([][]*cloudwatch.MetricDatum{oneDatum}, PartitionDatums(2, oneDatum)) | ||
assert.Equal([][]*cloudwatch.MetricDatum{twoDatum}, PartitionDatums(2, twoDatum)) | ||
assert.Equal([][]*cloudwatch.MetricDatum{twoDatum, oneDatum}, PartitionDatums(2, threeDatum)) | ||
} |