Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AMQP Consumer plugin #1678

Merged
merged 8 commits into from
Mar 3, 2017
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,10 @@ configuration options.

## Input Plugins

* [aws cloudwatch](./plugins/inputs/cloudwatch)
* [aerospike](./plugins/inputs/aerospike)
* [amqp_consumer](./plugins/inputs/amqp_consumer) (rabbitmq)
* [apache](./plugins/inputs/apache)
* [aws cloudwatch](./plugins/inputs/cloudwatch)
* [bcache](./plugins/inputs/bcache)
* [cassandra](./plugins/inputs/cassandra)
* [ceph](./plugins/inputs/ceph)
Expand Down
1 change: 1 addition & 0 deletions plugins/inputs/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package all

import (
_ "github.com/influxdata/telegraf/plugins/inputs/aerospike"
_ "github.com/influxdata/telegraf/plugins/inputs/amqp_consumer"
_ "github.com/influxdata/telegraf/plugins/inputs/apache"
_ "github.com/influxdata/telegraf/plugins/inputs/bcache"
_ "github.com/influxdata/telegraf/plugins/inputs/cassandra"
Expand Down
35 changes: 35 additions & 0 deletions plugins/inputs/amqp_consumer/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# AMQP Consumer Input Plugin

This plugin reads data from an AMQP Queue ([RabbitMQ](https://www.rabbitmq.com/) being an example) formatted in one of
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here I think we should mention that this plugin is for the AMQP 0.9.1 protocol, and link to the RabbitMQ doc about it: https://www.rabbitmq.com/amqp-0-9-1-reference.html & https://www.rabbitmq.com/tutorials/amqp-concepts.html

the [Telegraf Data Formats](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md).

The following defaults are set to work with RabbitMQ:

```
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

put toml after the ```, like:

```toml
...

this will tell github to colorize the readme

# AMQP consumer plugin
[[inputs.amqp_consumer]]
## AMQP url
url = "amqp://localhost:5672/influxdb"
## AMQP exchange
exchange = "telegraf"
## Auth method. PLAIN and EXTERNAL are supported
# auth_method = "PLAIN"
## Binding Key
binding_key = "#"

## Maximum number of messages server should give to the worker.
prefetch = 50

## Optional SSL Config
# ssl_ca = "/etc/telegraf/ca.pem"
# ssl_cert = "/etc/telegraf/cert.pem"
# ssl_key = "/etc/telegraf/key.pem"
## Use SSL but skip chain & host verification
# insecure_skip_verify = false

## Data format to output.
## Each data format has it's own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
data_format = "influx"
```
246 changes: 246 additions & 0 deletions plugins/inputs/amqp_consumer/amqp_consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
package amqp_consumer

import (
"fmt"
"log"
"strings"
"sync"

"github.com/streadway/amqp"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers"
)

// AMQPConsumer is the top level struct for this plugin
type AMQPConsumer struct {
URL string
// AMQP exchange
Exchange string

// AMQP Auth method
AuthMethod string
// Binding Key
BindingKey string `toml:"binding_key"`

Prefetch int
// Path to CA file
SSLCA string `toml:"ssl_ca"`
// Path to host cert file
SSLCert string `toml:"ssl_cert"`
// Path to cert key file
SSLKey string `toml:"ssl_key"`
// Use SSL but skip chain & host verification
InsecureSkipVerify bool

sync.Mutex

parser parsers.Parser
conn *amqp.Connection
ch *amqp.Channel
wg *sync.WaitGroup
}

type externalAuth struct{}

func (a *externalAuth) Mechanism() string {
return "EXTERNAL"
}
func (a *externalAuth) Response() string {
return fmt.Sprintf("\000")
}

const (
DefaultAuthMethod = "PLAIN"
DefaultPrefetch = 50
)

func (a *AMQPConsumer) SampleConfig() string {
return `
## AMQP url
url = "amqp://localhost:5672/influxdb"
## AMQP exchange
exchange = "telegraf"
## Auth method. PLAIN and EXTERNAL are supported
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does EXTERNAL require the AMQP instance to have a specific plugin enabled? could you link to a doc about this? (I could be wrong, but https://www.rabbitmq.com/authentication.html seems to suggest it requires a plugin)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it is required, I added a link in the latest push.

# auth_method = "PLAIN"
## Binding Key
binding_key = "#"

## Maximum number of messages server should give to the worker.
Copy link
Contributor

@sparrc sparrc Mar 2, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would expand on this a bit, using language directly from either the streadway AMQP library or from the RabbitMQ documentation.

You can also link to external documentation here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(especially mention that this is a channel QoS setting)

prefetch = 50

## Optional SSL Config
# ssl_ca = "/etc/telegraf/ca.pem"
# ssl_cert = "/etc/telegraf/cert.pem"
# ssl_key = "/etc/telegraf/key.pem"
## Use SSL but skip chain & host verification
# insecure_skip_verify = false

## Data format to output.
## Each data format has it's own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
data_format = "influx"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you should put the standard comment in here that is in other plugins

`
}

func (a *AMQPConsumer) Description() string {
return "AMQP consumer plugin"
}

func (a *AMQPConsumer) SetParser(parser parsers.Parser) {
a.parser = parser
}

// All gathering is done in the Start function
func (a *AMQPConsumer) Gather(_ telegraf.Accumulator) error {
return nil
}

func (a *AMQPConsumer) createConfig() (*amqp.Config, error) {
// make new tls config
tls, err := internal.GetTLSConfig(
a.SSLCert, a.SSLKey, a.SSLCA, a.InsecureSkipVerify)
if err != nil {
return nil, err
}

// parse auth method
var sasl []amqp.Authentication // nil by default

if strings.ToUpper(a.AuthMethod) == "EXTERNAL" {
sasl = []amqp.Authentication{&externalAuth{}}
}

config := amqp.Config{
TLSClientConfig: tls,
SASL: sasl, // if nil, it will be PLAIN
}
return &config, nil
}

// Start satisfies the telegraf.ServiceInput interface
func (a *AMQPConsumer) Start(acc telegraf.Accumulator) error {
amqpConf, err := a.createConfig()
if err != nil {
return err
}

conn, err := amqp.DialConfig(a.URL, *amqpConf)
if err != nil {
return err
}
a.conn = conn

// Create channel and assign it to AMQPConsumer
ch, err := conn.Channel()
if err != nil {
return fmt.Errorf("%v: Failed to open a channel", err)
}
a.ch = ch

err = ch.ExchangeDeclare(
a.Exchange, // name
"topic", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
if err != nil {
return fmt.Errorf("Failed to declare an exchange: %s", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This error message is inconsistent with the rest. Should be:

return fmt.Errorf("%v: Failed to declare an exchange", err)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I usually use %s for errors, is there a benefit to using %v? if err != nil then the string format will always work properly, I believe it will just print whatever err.Error() returns

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll stick to the current convention of "Failed to blah blah: %s"

}

// Declare a queue and assign it to AMQPConsumer
q, err := ch.QueueDeclare(
"telegraf", // queue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should the queue name be configurable? what if a user wanted to consume from two different telegraf instances?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added support for this.

true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
return fmt.Errorf("%v: Failed to declare a queue", err)
}

err = ch.QueueBind(
q.Name, // queue
a.BindingKey, // binding-key
a.Exchange, // exchange
false,
nil,
)
if err != nil {
return fmt.Errorf("%v: Failed to bind a queue", err)
}

// Declare QoS on queue
err = ch.Qos(
a.Prefetch,
0, // prefetch-size
false, // global
)
if err != nil {
return fmt.Errorf("%v: failed to set Qos", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be QoS in error message.

}

msgs, err := a.ch.Consume(
q.Name, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // arguments
)
if err != nil {
return fmt.Errorf("%v: failed establishing connection to queue", err)
}

a.wg = &sync.WaitGroup{}
a.wg.Add(1)
go a.process(msgs, acc)

// Log that service has started
log.Println("I! Starting AMQP service...")
return nil
}

// Read messages from queue and add them to the Accumulator
func (a *AMQPConsumer) process(msgs <-chan amqp.Delivery, acc telegraf.Accumulator) {
defer a.wg.Done()
for d := range msgs {
Copy link
Contributor

@sparrc sparrc Mar 2, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens here when the channel is closed? does the loop just cleanly exit? how does that happen?

I would have thought it'd start sending nils?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"When the channel or connection closes, all delivery chans will also close. " - https://godoc.org/github.com/streadway/amqp#Channel.Consume

metric, err := a.parser.Parse(d.Body)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: metric -> metrics

if err != nil {
log.Printf("E! %v: error parsing metric - %v", err, string(d.Body))
} else {
for _, m := range metric {
acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time())
}
}

d.Ack(false)
}
log.Println("I! Stopped AMQP service")
}

func (a *AMQPConsumer) Stop() {
a.Lock()
defer a.Unlock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we call a.ch.Cancel()? https://godoc.org/github.com/streadway/amqp#Channel.Cancel

Could be no, TBH I don't really understand the difference between "Channel.Cancel" and "Channel.Close"

Copy link
Contributor

@sparrc sparrc Mar 2, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW, there's probably nothing to change here, this example code from the library itself simply closes the connection, which "takes the channel with it": https://github.com/streadway/amqp/blob/master/_examples/pubsub/pubsub.go#L37

(note that the session struct in that example has an s.Channel object defined, but doesn't get used in the Close() func)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and with that being said, another example code calls Cancel and then Close: https://github.com/streadway/amqp/blob/master/_examples/simple-consumer/consumer.go#L141

but it calls Cancel with noWait=true, which is completely pointless AFAICT

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it only matters if you will use multiple consumers on a single channel. We should be able to just close the connection as well.

a.ch.Close()
a.wg.Wait()
a.conn.Close()
}

func init() {
inputs.Add("amqp_consumer", func() telegraf.Input {
return &AMQPConsumer{
AuthMethod: DefaultAuthMethod,
Prefetch: DefaultPrefetch,
}
})
}
3 changes: 1 addition & 2 deletions plugins/outputs/amqp/README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# AMQP Output Plugin

This plugin writes to a AMQP exchange using tag, defined in configuration file
as RoutingTag, as a routing key.
This plugin writes to a AMQP exchange, like [RabbitMQ](https://www.rabbitmq.com/) using tag, defined in configuration file as RoutingTag, as a routing key.

If RoutingTag is empty, then empty routing key will be used.
Metrics are grouped in batches by RoutingTag.
Expand Down
7 changes: 6 additions & 1 deletion plugins/outputs/amqp/amqp.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type AMQP struct {
// Use SSL but skip chain & host verification
InsecureSkipVerify bool

conn *amqp.Connection
channel *amqp.Channel
sync.Mutex
headers amqp.Table
Expand Down Expand Up @@ -129,6 +130,8 @@ func (q *AMQP) Connect() error {
if err != nil {
return err
}
q.conn = connection

channel, err := connection.Channel()
if err != nil {
return fmt.Errorf("Failed to open a channel: %s", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Error message inconsistent here as compared to others.

Expand Down Expand Up @@ -160,7 +163,8 @@ func (q *AMQP) Connect() error {
}

func (q *AMQP) Close() error {
return q.channel.Close()
q.channel.Close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check for close error and return

return q.conn.Close()
}

func (q *AMQP) SampleConfig() string {
Expand Down Expand Up @@ -205,6 +209,7 @@ func (q *AMQP) Write(metrics []telegraf.Metric) error {
Headers: q.headers,
ContentType: "text/plain",
Body: buf,
// DeliveryMode: amqp.Persistent,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this comment be removed?

})
if err != nil {
return fmt.Errorf("FAILED to send amqp message: %s", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Error formatting inconsistent here are compared to others

Expand Down