Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AMQP Consumer plugin #1678

Merged
merged 8 commits into from
Mar 3, 2017
Merged

AMQP Consumer plugin #1678

merged 8 commits into from
Mar 3, 2017

Conversation

jackzampolin
Copy link
Contributor

@jackzampolin jackzampolin commented Aug 29, 2016

Required for all PRs:

  • CHANGELOG.md updated (we recommend not updating this until the PR has been approved by a maintainer)
  • Sign CLA (if not already signed)
  • README.md updated (if adding a new plugin)

@jwilder jwilder added the feature request Requests for new plugin and for new features to existing plugins label Sep 1, 2016
@nhaugo nhaugo self-assigned this Sep 1, 2016
@jackzampolin jackzampolin force-pushed the jz-rabbitmq-input branch 3 times, most recently from 411e325 to 79f071b Compare October 7, 2016 19:49
@hanej
Copy link

hanej commented Oct 10, 2016

Really looking forward to this PR so I can get the data back out of Rabbit

@nhaugo
Copy link
Contributor

nhaugo commented Oct 13, 2016

@hanej we are implementing a plugin system, this should get merged once that is complete.
see #1717 and #1718.

@nhaugo nhaugo removed their assignment Oct 13, 2016
@jackzampolin
Copy link
Contributor Author

@sparrc Any way we could get this in 1.2?

@sparrc
Copy link
Contributor

sparrc commented Jan 6, 2017

Sorry bur I think 1.2 is too soon as I haven't reviewed this yet and there is a large backlog. We can try for 1.3.

@sparrc sparrc added this to the 1.3.0 milestone Jan 6, 2017
Copy link
Contributor

@sparrc sparrc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See comments in review.

One other thing: I think we might want to change the AMQP output plugin to default to an exchange of "". We should also verify that both plugins can talk to each other with their default configurations.

func (rmq *AMQPConsumer) SampleConfig() string {
return `
# The following options form a connection string to amqp:
# amqp://{username}:{password}@{amqp_host}:{amqp_port}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change this so that users just need to specify they're server in this format

amqp_port = "5672"
# name of the queue to consume from
queue = "task_queue"
prefetch = 1000
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spacing and what does this option do?

}

// listen(s) for new messages coming in from AMQP and pawns them off to handleMessage
func (rmq *AMQPConsumer) listen(msgs <-chan amqp.Delivery, acc telegraf.Accumulator) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should probably wait for this function to exit when calling Stop()

func (rmq *AMQPConsumer) registerConsumer() <-chan amqp.Delivery {
messages, err := rmq.ch.Consume(rmq.Queue, "", false, false, false, false, nil)
if err != nil {
panic(fmt.Errorf("%v: failed establishing connection to queue", err))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't panic

// listen(s) for new messages coming in from AMQP and pawns them off to handleMessage
func (rmq *AMQPConsumer) listen(msgs <-chan amqp.Delivery, acc telegraf.Accumulator) {
for d := range msgs {
go handleMessage(d, acc, rmq.parser)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a go-routine per message is not a good idea, this will be too much in high-traffic scenarios. Would be better to just handle messages serially.

func handleMessage(d amqp.Delivery, acc telegraf.Accumulator, parser parsers.Parser) {
metric, err := parser.Parse(d.Body)
if err != nil {
log.Fatalf("%v: error parsing metric - %v", err, string(d.Body))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dont use log.Fatal, this should be a log.Print and the message should start with E! ...

@@ -0,0 +1,142 @@
package rabbitmqConsumer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are there two of the same plugin? couldn't we only do an amqp consumer?

queue = "task_queue"
prefetch = 1000

data_format = "influx"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you should put the standard comment in here that is in other plugins

@sparrc
Copy link
Contributor

sparrc commented Jan 13, 2017

last thing is that we should probably mention in the documentation, default config, and README (including the main repo README) that AMQP == RabbitMQ.

@danielnelson
Copy link
Contributor

I added support for a topic exchange, since that is what the output plugin uses, as well as the authentication options that the output plugin has (custom TLS and SASL)

@danielnelson danielnelson requested a review from goller March 1, 2017 22:43
Copy link
Contributor

@goller goller left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor changes in formatting and such.

@@ -205,6 +209,7 @@ func (q *AMQP) Write(metrics []telegraf.Metric) error {
Headers: q.headers,
ContentType: "text/plain",
Body: buf,
// DeliveryMode: amqp.Persistent,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this comment be removed?

nil, // arguments
)
if err != nil {
return fmt.Errorf("Failed to declare an exchange: %s", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This error message is inconsistent with the rest. Should be:

return fmt.Errorf("%v: Failed to declare an exchange", err)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I usually use %s for errors, is there a benefit to using %v? if err != nil then the string format will always work properly, I believe it will just print whatever err.Error() returns

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll stick to the current convention of "Failed to blah blah: %s"

a.Prefetch,
0, // prefetch-size
false, // global
)
if err != nil {
return fmt.Errorf("%v: failed to set Qos", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be QoS in error message.

@@ -160,7 +163,8 @@ func (q *AMQP) Connect() error {
}

func (q *AMQP) Close() error {
return q.channel.Close()
q.channel.Close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check for close error and return

@@ -205,6 +209,7 @@ func (q *AMQP) Write(metrics []telegraf.Metric) error {
Headers: q.headers,
ContentType: "text/plain",
Body: buf,
// DeliveryMode: amqp.Persistent,
})
if err != nil {
return fmt.Errorf("FAILED to send amqp message: %s", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Error formatting inconsistent here are compared to others

@@ -129,6 +130,8 @@ func (q *AMQP) Connect() error {
if err != nil {
return err
}
q.conn = connection

channel, err := connection.Channel()
if err != nil {
return fmt.Errorf("Failed to open a channel: %s", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Error message inconsistent here as compared to others.

## Binding Key
binding_key = "#"

## Maximum number of messages server should give to the worker.
Copy link
Contributor

@sparrc sparrc Mar 2, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would expand on this a bit, using language directly from either the streadway AMQP library or from the RabbitMQ documentation.

You can also link to external documentation here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(especially mention that this is a channel QoS setting)

@@ -0,0 +1,35 @@
# AMQP Consumer Input Plugin

This plugin reads data from an AMQP Queue ([RabbitMQ](https://www.rabbitmq.com/) being an example) formatted in one of
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here I think we should mention that this plugin is for the AMQP 0.9.1 protocol, and link to the RabbitMQ doc about it: https://www.rabbitmq.com/amqp-0-9-1-reference.html & https://www.rabbitmq.com/tutorials/amqp-concepts.html


// Declare a queue and assign it to AMQPConsumer
q, err := ch.QueueDeclare(
"telegraf", // queue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should the queue name be configurable? what if a user wanted to consume from two different telegraf instances?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added support for this.

url = "amqp://localhost:5672/influxdb"
## AMQP exchange
exchange = "telegraf"
## Auth method. PLAIN and EXTERNAL are supported
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does EXTERNAL require the AMQP instance to have a specific plugin enabled? could you link to a doc about this? (I could be wrong, but https://www.rabbitmq.com/authentication.html seems to suggest it requires a plugin)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it is required, I added a link in the latest push.


The following defaults are set to work with RabbitMQ:

```
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

put toml after the ```, like:

```toml
...

this will tell github to colorize the readme

func (a *AMQPConsumer) process(msgs <-chan amqp.Delivery, acc telegraf.Accumulator) {
defer a.wg.Done()
for d := range msgs {
metric, err := a.parser.Parse(d.Body)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: metric -> metrics


func (a *AMQPConsumer) Stop() {
a.Lock()
defer a.Unlock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we call a.ch.Cancel()? https://godoc.org/github.com/streadway/amqp#Channel.Cancel

Could be no, TBH I don't really understand the difference between "Channel.Cancel" and "Channel.Close"

Copy link
Contributor

@sparrc sparrc Mar 2, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW, there's probably nothing to change here, this example code from the library itself simply closes the connection, which "takes the channel with it": https://github.com/streadway/amqp/blob/master/_examples/pubsub/pubsub.go#L37

(note that the session struct in that example has an s.Channel object defined, but doesn't get used in the Close() func)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and with that being said, another example code calls Cancel and then Close: https://github.com/streadway/amqp/blob/master/_examples/simple-consumer/consumer.go#L141

but it calls Cancel with noWait=true, which is completely pointless AFAICT

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it only matters if you will use multiple consumers on a single channel. We should be able to just close the connection as well.

// Read messages from queue and add them to the Accumulator
func (a *AMQPConsumer) process(msgs <-chan amqp.Delivery, acc telegraf.Accumulator) {
defer a.wg.Done()
for d := range msgs {
Copy link
Contributor

@sparrc sparrc Mar 2, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens here when the channel is closed? does the loop just cleanly exit? how does that happen?

I would have thought it'd start sending nils?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"When the channel or connection closes, all delivery chans will also close. " - https://godoc.org/github.com/streadway/amqp#Channel.Consume

@danielnelson
Copy link
Contributor

I believe all the concerns above are addressed, and additionally I added support for reconnecting. So there are quite a few changes, can you re-review?

@sparrc
Copy link
Contributor

sparrc commented Mar 3, 2017

LGTM, nice work!

@danielnelson danielnelson merged commit 1074464 into master Mar 3, 2017
@danielnelson danielnelson deleted the jz-rabbitmq-input branch March 3, 2017 18:24
ssorathia pushed a commit to ssorathia/telegraf that referenced this pull request Mar 25, 2017
vlamug pushed a commit to vlamug/telegraf that referenced this pull request May 30, 2017
maxunt pushed a commit that referenced this pull request Jun 26, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature request Requests for new plugin and for new features to existing plugins
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants