-
Notifications
You must be signed in to change notification settings - Fork 5.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
AMQP Consumer plugin #1678
AMQP Consumer plugin #1678
Changes from 6 commits
3f6a98f
640b984
c4e609c
863eea7
a321953
b7f217c
d4693ee
e30439b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
# AMQP Consumer Input Plugin | ||
|
||
This plugin reads data from an AMQP Queue ([RabbitMQ](https://www.rabbitmq.com/) being an example) formatted in one of | ||
the [Telegraf Data Formats](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md). | ||
|
||
The following defaults are set to work with RabbitMQ: | ||
|
||
``` | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. put
this will tell github to colorize the readme |
||
# AMQP consumer plugin | ||
[[inputs.amqp_consumer]] | ||
## AMQP url | ||
url = "amqp://localhost:5672/influxdb" | ||
## AMQP exchange | ||
exchange = "telegraf" | ||
## Auth method. PLAIN and EXTERNAL are supported | ||
# auth_method = "PLAIN" | ||
## Binding Key | ||
binding_key = "#" | ||
|
||
## Maximum number of messages server should give to the worker. | ||
prefetch = 50 | ||
|
||
## Optional SSL Config | ||
# ssl_ca = "/etc/telegraf/ca.pem" | ||
# ssl_cert = "/etc/telegraf/cert.pem" | ||
# ssl_key = "/etc/telegraf/key.pem" | ||
## Use SSL but skip chain & host verification | ||
# insecure_skip_verify = false | ||
|
||
## Data format to output. | ||
## Each data format has it's own unique set of configuration options, read | ||
## more about them here: | ||
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md | ||
data_format = "influx" | ||
``` |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,246 @@ | ||
package amqp_consumer | ||
|
||
import ( | ||
"fmt" | ||
"log" | ||
"strings" | ||
"sync" | ||
|
||
"github.com/streadway/amqp" | ||
|
||
"github.com/influxdata/telegraf" | ||
"github.com/influxdata/telegraf/internal" | ||
"github.com/influxdata/telegraf/plugins/inputs" | ||
"github.com/influxdata/telegraf/plugins/parsers" | ||
) | ||
|
||
// AMQPConsumer is the top level struct for this plugin | ||
type AMQPConsumer struct { | ||
URL string | ||
// AMQP exchange | ||
Exchange string | ||
|
||
// AMQP Auth method | ||
AuthMethod string | ||
// Binding Key | ||
BindingKey string `toml:"binding_key"` | ||
|
||
Prefetch int | ||
// Path to CA file | ||
SSLCA string `toml:"ssl_ca"` | ||
// Path to host cert file | ||
SSLCert string `toml:"ssl_cert"` | ||
// Path to cert key file | ||
SSLKey string `toml:"ssl_key"` | ||
// Use SSL but skip chain & host verification | ||
InsecureSkipVerify bool | ||
|
||
sync.Mutex | ||
|
||
parser parsers.Parser | ||
conn *amqp.Connection | ||
ch *amqp.Channel | ||
wg *sync.WaitGroup | ||
} | ||
|
||
type externalAuth struct{} | ||
|
||
func (a *externalAuth) Mechanism() string { | ||
return "EXTERNAL" | ||
} | ||
func (a *externalAuth) Response() string { | ||
return fmt.Sprintf("\000") | ||
} | ||
|
||
const ( | ||
DefaultAuthMethod = "PLAIN" | ||
DefaultPrefetch = 50 | ||
) | ||
|
||
func (a *AMQPConsumer) SampleConfig() string { | ||
return ` | ||
## AMQP url | ||
url = "amqp://localhost:5672/influxdb" | ||
## AMQP exchange | ||
exchange = "telegraf" | ||
## Auth method. PLAIN and EXTERNAL are supported | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does EXTERNAL require the AMQP instance to have a specific plugin enabled? could you link to a doc about this? (I could be wrong, but https://www.rabbitmq.com/authentication.html seems to suggest it requires a plugin) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes it is required, I added a link in the latest push. |
||
# auth_method = "PLAIN" | ||
## Binding Key | ||
binding_key = "#" | ||
|
||
## Maximum number of messages server should give to the worker. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would expand on this a bit, using language directly from either the streadway AMQP library or from the RabbitMQ documentation. You can also link to external documentation here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (especially mention that this is a channel QoS setting) |
||
prefetch = 50 | ||
|
||
## Optional SSL Config | ||
# ssl_ca = "/etc/telegraf/ca.pem" | ||
# ssl_cert = "/etc/telegraf/cert.pem" | ||
# ssl_key = "/etc/telegraf/key.pem" | ||
## Use SSL but skip chain & host verification | ||
# insecure_skip_verify = false | ||
|
||
## Data format to output. | ||
## Each data format has it's own unique set of configuration options, read | ||
## more about them here: | ||
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md | ||
data_format = "influx" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you should put the standard comment in here that is in other plugins |
||
` | ||
} | ||
|
||
func (a *AMQPConsumer) Description() string { | ||
return "AMQP consumer plugin" | ||
} | ||
|
||
func (a *AMQPConsumer) SetParser(parser parsers.Parser) { | ||
a.parser = parser | ||
} | ||
|
||
// All gathering is done in the Start function | ||
func (a *AMQPConsumer) Gather(_ telegraf.Accumulator) error { | ||
return nil | ||
} | ||
|
||
func (a *AMQPConsumer) createConfig() (*amqp.Config, error) { | ||
// make new tls config | ||
tls, err := internal.GetTLSConfig( | ||
a.SSLCert, a.SSLKey, a.SSLCA, a.InsecureSkipVerify) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
// parse auth method | ||
var sasl []amqp.Authentication // nil by default | ||
|
||
if strings.ToUpper(a.AuthMethod) == "EXTERNAL" { | ||
sasl = []amqp.Authentication{&externalAuth{}} | ||
} | ||
|
||
config := amqp.Config{ | ||
TLSClientConfig: tls, | ||
SASL: sasl, // if nil, it will be PLAIN | ||
} | ||
return &config, nil | ||
} | ||
|
||
// Start satisfies the telegraf.ServiceInput interface | ||
func (a *AMQPConsumer) Start(acc telegraf.Accumulator) error { | ||
amqpConf, err := a.createConfig() | ||
if err != nil { | ||
return err | ||
} | ||
|
||
conn, err := amqp.DialConfig(a.URL, *amqpConf) | ||
if err != nil { | ||
return err | ||
} | ||
a.conn = conn | ||
|
||
// Create channel and assign it to AMQPConsumer | ||
ch, err := conn.Channel() | ||
if err != nil { | ||
return fmt.Errorf("%v: Failed to open a channel", err) | ||
} | ||
a.ch = ch | ||
|
||
err = ch.ExchangeDeclare( | ||
a.Exchange, // name | ||
"topic", // type | ||
true, // durable | ||
false, // auto-deleted | ||
false, // internal | ||
false, // no-wait | ||
nil, // arguments | ||
) | ||
if err != nil { | ||
return fmt.Errorf("Failed to declare an exchange: %s", err) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This error message is inconsistent with the rest. Should be: return fmt.Errorf("%v: Failed to declare an exchange", err) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I usually use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll stick to the current convention of "Failed to blah blah: %s" |
||
} | ||
|
||
// Declare a queue and assign it to AMQPConsumer | ||
q, err := ch.QueueDeclare( | ||
"telegraf", // queue | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should the queue name be configurable? what if a user wanted to consume from two different telegraf instances? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I added support for this. |
||
true, // durable | ||
false, // delete when unused | ||
false, // exclusive | ||
false, // no-wait | ||
nil, // arguments | ||
) | ||
if err != nil { | ||
return fmt.Errorf("%v: Failed to declare a queue", err) | ||
} | ||
|
||
err = ch.QueueBind( | ||
q.Name, // queue | ||
a.BindingKey, // binding-key | ||
a.Exchange, // exchange | ||
false, | ||
nil, | ||
) | ||
if err != nil { | ||
return fmt.Errorf("%v: Failed to bind a queue", err) | ||
} | ||
|
||
// Declare QoS on queue | ||
err = ch.Qos( | ||
a.Prefetch, | ||
0, // prefetch-size | ||
false, // global | ||
) | ||
if err != nil { | ||
return fmt.Errorf("%v: failed to set Qos", err) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should be |
||
} | ||
|
||
msgs, err := a.ch.Consume( | ||
q.Name, // queue | ||
"", // consumer | ||
false, // auto-ack | ||
false, // exclusive | ||
false, // no-local | ||
false, // no-wait | ||
nil, // arguments | ||
) | ||
if err != nil { | ||
return fmt.Errorf("%v: failed establishing connection to queue", err) | ||
} | ||
|
||
a.wg = &sync.WaitGroup{} | ||
a.wg.Add(1) | ||
go a.process(msgs, acc) | ||
|
||
// Log that service has started | ||
log.Println("I! Starting AMQP service...") | ||
return nil | ||
} | ||
|
||
// Read messages from queue and add them to the Accumulator | ||
func (a *AMQPConsumer) process(msgs <-chan amqp.Delivery, acc telegraf.Accumulator) { | ||
defer a.wg.Done() | ||
for d := range msgs { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what happens here when the channel is closed? does the loop just cleanly exit? how does that happen? I would have thought it'd start sending There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. "When the channel or connection closes, all delivery chans will also close. " - https://godoc.org/github.com/streadway/amqp#Channel.Consume |
||
metric, err := a.parser.Parse(d.Body) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: |
||
if err != nil { | ||
log.Printf("E! %v: error parsing metric - %v", err, string(d.Body)) | ||
} else { | ||
for _, m := range metric { | ||
acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time()) | ||
} | ||
} | ||
|
||
d.Ack(false) | ||
} | ||
log.Println("I! Stopped AMQP service") | ||
} | ||
|
||
func (a *AMQPConsumer) Stop() { | ||
a.Lock() | ||
defer a.Unlock() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we call Could be no, TBH I don't really understand the difference between "Channel.Cancel" and "Channel.Close" There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. FWIW, there's probably nothing to change here, this example code from the library itself simply closes the connection, which "takes the channel with it": https://github.com/streadway/amqp/blob/master/_examples/pubsub/pubsub.go#L37 (note that the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. and with that being said, another example code calls Cancel and then Close: https://github.com/streadway/amqp/blob/master/_examples/simple-consumer/consumer.go#L141 but it calls Cancel with noWait=true, which is completely pointless AFAICT There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it only matters if you will use multiple consumers on a single channel. We should be able to just close the connection as well. |
||
a.ch.Close() | ||
a.wg.Wait() | ||
a.conn.Close() | ||
} | ||
|
||
func init() { | ||
inputs.Add("amqp_consumer", func() telegraf.Input { | ||
return &AMQPConsumer{ | ||
AuthMethod: DefaultAuthMethod, | ||
Prefetch: DefaultPrefetch, | ||
} | ||
}) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -40,6 +40,7 @@ type AMQP struct { | |
// Use SSL but skip chain & host verification | ||
InsecureSkipVerify bool | ||
|
||
conn *amqp.Connection | ||
channel *amqp.Channel | ||
sync.Mutex | ||
headers amqp.Table | ||
|
@@ -129,6 +130,8 @@ func (q *AMQP) Connect() error { | |
if err != nil { | ||
return err | ||
} | ||
q.conn = connection | ||
|
||
channel, err := connection.Channel() | ||
if err != nil { | ||
return fmt.Errorf("Failed to open a channel: %s", err) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Error message inconsistent here as compared to others. |
||
|
@@ -160,7 +163,8 @@ func (q *AMQP) Connect() error { | |
} | ||
|
||
func (q *AMQP) Close() error { | ||
return q.channel.Close() | ||
q.channel.Close() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Check for close error and return |
||
return q.conn.Close() | ||
} | ||
|
||
func (q *AMQP) SampleConfig() string { | ||
|
@@ -205,6 +209,7 @@ func (q *AMQP) Write(metrics []telegraf.Metric) error { | |
Headers: q.headers, | ||
ContentType: "text/plain", | ||
Body: buf, | ||
// DeliveryMode: amqp.Persistent, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this comment be removed? |
||
}) | ||
if err != nil { | ||
return fmt.Errorf("FAILED to send amqp message: %s", err) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Error formatting inconsistent here are compared to others |
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here I think we should mention that this plugin is for the AMQP 0.9.1 protocol, and link to the RabbitMQ doc about it: https://www.rabbitmq.com/amqp-0-9-1-reference.html & https://www.rabbitmq.com/tutorials/amqp-concepts.html