Skip to content

Commit

Permalink
Add queue name and reconnect support to amqp_consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
danielnelson committed Mar 2, 2017
1 parent b7f217c commit d4693ee
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 61 deletions.
28 changes: 20 additions & 8 deletions plugins/inputs/amqp_consumer/README.md
Original file line number Diff line number Diff line change
@@ -1,25 +1,37 @@
# AMQP Consumer Input Plugin

This plugin reads data from an AMQP Queue ([RabbitMQ](https://www.rabbitmq.com/) being an example) formatted in one of
the [Telegraf Data Formats](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md).
This plugin provides a consumer for use with AMQP 0-9-1, a promenent implementation of this protocol being [RabbitMQ](https://www.rabbitmq.com/).

The following defaults are set to work with RabbitMQ:
Metrics are read from a topic exchange using the configured queue and binding_key.

```
Message payload should be formatted in one of the [Telegraf Data Formats](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md).

For an introduction to AMQP see:
- https://www.rabbitmq.com/tutorials/amqp-concepts.html
- https://www.rabbitmq.com/getstarted.html

The following defaults are known to work with RabbitMQ:

```toml
# AMQP consumer plugin
[[inputs.amqp_consumer]]
## AMQP url
url = "amqp://localhost:5672/influxdb"
## AMQP exchange
exchange = "telegraf"
## Auth method. PLAIN and EXTERNAL are supported
# auth_method = "PLAIN"
## AMQP queue name
queue = "telegraf"
## Binding Key
binding_key = "#"

## Maximum number of messages server should give to the worker.
prefetch = 50
## Controls how many messages the server will try to keep on the network
## for consumers before receiving delivery acks.
#prefetch_count = 50

## Auth method. PLAIN and EXTERNAL are supported.
## Using EXTERNAL requires enabling the rabbitmq_auth_mechanism_ssl plugin as
## described here: https://www.rabbitmq.com/plugins.html
# auth_method = "PLAIN"
## Optional SSL Config
# ssl_ca = "/etc/telegraf/ca.pem"
# ssl_cert = "/etc/telegraf/cert.pem"
Expand Down
128 changes: 81 additions & 47 deletions plugins/inputs/amqp_consumer/amqp_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"log"
"strings"
"sync"
"time"

"github.com/streadway/amqp"

Expand All @@ -19,13 +20,17 @@ type AMQPConsumer struct {
URL string
// AMQP exchange
Exchange string

// AMQP Auth method
AuthMethod string
// Queue Name
Queue string
// Binding Key
BindingKey string `toml:"binding_key"`

Prefetch int
// Controls how many messages the server will try to keep on the network
// for consumers before receiving delivery acks.
PrefetchCount int

// AMQP Auth method
AuthMethod string
// Path to CA file
SSLCA string `toml:"ssl_ca"`
// Path to host cert file
Expand All @@ -35,11 +40,8 @@ type AMQPConsumer struct {
// Use SSL but skip chain & host verification
InsecureSkipVerify bool

sync.Mutex

parser parsers.Parser
conn *amqp.Connection
ch *amqp.Channel
wg *sync.WaitGroup
}

Expand All @@ -53,8 +55,8 @@ func (a *externalAuth) Response() string {
}

const (
DefaultAuthMethod = "PLAIN"
DefaultPrefetch = 50
DefaultAuthMethod = "PLAIN"
DefaultPrefetchCount = 50
)

func (a *AMQPConsumer) SampleConfig() string {
Expand All @@ -63,13 +65,18 @@ func (a *AMQPConsumer) SampleConfig() string {
url = "amqp://localhost:5672/influxdb"
## AMQP exchange
exchange = "telegraf"
## Auth method. PLAIN and EXTERNAL are supported
# auth_method = "PLAIN"
## AMQP queue name
queue = "telegraf"
## Binding Key
binding_key = "#"
## Maximum number of messages server should give to the worker.
prefetch = 50
prefetch_count = 50
## Auth method. PLAIN and EXTERNAL are supported
## Using EXTERNAL requires enabling the rabbitmq_auth_mechanism_ssl plugin as
## described here: https://www.rabbitmq.com/plugins.html
# auth_method = "PLAIN"
## Optional SSL Config
# ssl_ca = "/etc/telegraf/ca.pem"
Expand Down Expand Up @@ -128,18 +135,50 @@ func (a *AMQPConsumer) Start(acc telegraf.Accumulator) error {
return err
}

conn, err := amqp.DialConfig(a.URL, *amqpConf)
msgs, err := a.connect(amqpConf)
if err != nil {
return err
}

a.wg = &sync.WaitGroup{}
a.wg.Add(1)
go a.process(msgs, acc)

go func() {
err := <-a.conn.NotifyClose(make(chan *amqp.Error))
if err == nil {
return
}

log.Printf("I! AMQP consumer connection closed: %s; trying to reconnect", err)
for {
msgs, err := a.connect(amqpConf)
if err != nil {
log.Printf("E! AMQP connection failed: %s", err)
time.Sleep(10 * time.Second)
continue
}

a.wg.Add(1)
go a.process(msgs, acc)
break
}
}()

return nil
}

func (a *AMQPConsumer) connect(amqpConf *amqp.Config) (<-chan amqp.Delivery, error) {
conn, err := amqp.DialConfig(a.URL, *amqpConf)
if err != nil {
return nil, err
}
a.conn = conn

// Create channel and assign it to AMQPConsumer
ch, err := conn.Channel()
if err != nil {
return fmt.Errorf("%v: Failed to open a channel", err)
return nil, fmt.Errorf("Failed to open a channel: %s", err)
}
a.ch = ch

err = ch.ExchangeDeclare(
a.Exchange, // name
Expand All @@ -151,20 +190,19 @@ func (a *AMQPConsumer) Start(acc telegraf.Accumulator) error {
nil, // arguments
)
if err != nil {
return fmt.Errorf("Failed to declare an exchange: %s", err)
return nil, fmt.Errorf("Failed to declare an exchange: %s", err)
}

// Declare a queue and assign it to AMQPConsumer
q, err := ch.QueueDeclare(
"telegraf", // queue
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
a.Queue, // queue
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
return fmt.Errorf("%v: Failed to declare a queue", err)
return nil, fmt.Errorf("Failed to declare a queue: %s", err)
}

err = ch.QueueBind(
Expand All @@ -175,20 +213,19 @@ func (a *AMQPConsumer) Start(acc telegraf.Accumulator) error {
nil,
)
if err != nil {
return fmt.Errorf("%v: Failed to bind a queue", err)
return nil, fmt.Errorf("Failed to bind a queue: %s", err)
}

// Declare QoS on queue
err = ch.Qos(
a.Prefetch,
a.PrefetchCount,
0, // prefetch-size
false, // global
)
if err != nil {
return fmt.Errorf("%v: failed to set Qos", err)
return nil, fmt.Errorf("Failed to set QoS: %s", err)
}

msgs, err := a.ch.Consume(
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
false, // auto-ack
Expand All @@ -198,49 +235,46 @@ func (a *AMQPConsumer) Start(acc telegraf.Accumulator) error {
nil, // arguments
)
if err != nil {
return fmt.Errorf("%v: failed establishing connection to queue", err)
return nil, fmt.Errorf("Failed establishing connection to queue: %s", err)
}

a.wg = &sync.WaitGroup{}
a.wg.Add(1)
go a.process(msgs, acc)

// Log that service has started
log.Println("I! Starting AMQP service...")
return nil
log.Println("I! Started AMQP consumer")
return msgs, err
}

// Read messages from queue and add them to the Accumulator
func (a *AMQPConsumer) process(msgs <-chan amqp.Delivery, acc telegraf.Accumulator) {
defer a.wg.Done()
for d := range msgs {
metric, err := a.parser.Parse(d.Body)
metrics, err := a.parser.Parse(d.Body)
if err != nil {
log.Printf("E! %v: error parsing metric - %v", err, string(d.Body))
} else {
for _, m := range metric {
for _, m := range metrics {
acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time())
}
}

d.Ack(false)
}
log.Println("I! Stopped AMQP service")
log.Printf("I! AMQP consumer queue closed")
}

func (a *AMQPConsumer) Stop() {
a.Lock()
defer a.Unlock()
a.ch.Close()
err := a.conn.Close()
if err != nil && err != amqp.ErrClosed {
log.Printf("E! Error closing AMQP connection: %s", err)
return
}
a.wg.Wait()
a.conn.Close()
log.Println("I! Stopped AMQP service")
}

func init() {
inputs.Add("amqp_consumer", func() telegraf.Input {
return &AMQPConsumer{
AuthMethod: DefaultAuthMethod,
Prefetch: DefaultPrefetch,
AuthMethod: DefaultAuthMethod,
PrefetchCount: DefaultPrefetchCount,
}
})
}
10 changes: 9 additions & 1 deletion plugins/outputs/amqp/README.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
# AMQP Output Plugin

This plugin writes to a AMQP exchange, like [RabbitMQ](https://www.rabbitmq.com/) using tag, defined in configuration file as RoutingTag, as a routing key.
This plugin writes to a AMQP 0-9-1 Exchange, a promenent implementation of this protocol being [RabbitMQ](https://www.rabbitmq.com/).

Metrics are written to a topic exchange using tag, defined in configuration file as RoutingTag, as a routing key.

If RoutingTag is empty, then empty routing key will be used.
Metrics are grouped in batches by RoutingTag.

This plugin doesn't bind exchange to a queue, so it should be done by consumer.

For an introduction to AMQP see:
- https://www.rabbitmq.com/tutorials/amqp-concepts.html
- https://www.rabbitmq.com/getstarted.html

### Configuration:

```
Expand All @@ -17,6 +23,8 @@ This plugin doesn't bind exchange to a queue, so it should be done by consumer.
## AMQP exchange
exchange = "telegraf"
## Auth method. PLAIN and EXTERNAL are supported
## Using EXTERNAL requires enabling the rabbitmq_auth_mechanism_ssl plugin as
## described here: https://www.rabbitmq.com/plugins.html
# auth_method = "PLAIN"
## Telegraf tag to use as a routing key
## ie, if this tag exists, it's value will be used as the routing key
Expand Down
19 changes: 14 additions & 5 deletions plugins/outputs/amqp/amqp.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ var sampleConfig = `
## AMQP exchange
exchange = "telegraf"
## Auth method. PLAIN and EXTERNAL are supported
## Using EXTERNAL requires enabling the rabbitmq_auth_mechanism_ssl plugin as
## described here: https://www.rabbitmq.com/plugins.html
# auth_method = "PLAIN"
## Telegraf tag to use as a routing key
## ie, if this tag exists, it's value will be used as the routing key
Expand Down Expand Up @@ -151,7 +153,11 @@ func (q *AMQP) Connect() error {
}
q.channel = channel
go func() {
log.Printf("I! Closing: %s", <-connection.NotifyClose(make(chan *amqp.Error)))
err := <-connection.NotifyClose(make(chan *amqp.Error))
if err == nil {
return
}
log.Printf("I! Closing: %s", err)
log.Printf("I! Trying to reconnect")
for err := q.Connect(); err != nil; err = q.Connect() {
log.Println("E! ", err.Error())
Expand All @@ -163,8 +169,12 @@ func (q *AMQP) Connect() error {
}

func (q *AMQP) Close() error {
q.channel.Close()
return q.conn.Close()
err := q.conn.Close()
if err != nil && err != amqp.ErrClosed {
log.Printf("E! Error closing AMQP connection: %s", err)
return err
}
return nil
}

func (q *AMQP) SampleConfig() string {
Expand Down Expand Up @@ -209,10 +219,9 @@ func (q *AMQP) Write(metrics []telegraf.Metric) error {
Headers: q.headers,
ContentType: "text/plain",
Body: buf,
// DeliveryMode: amqp.Persistent,
})
if err != nil {
return fmt.Errorf("FAILED to send amqp message: %s", err)
return fmt.Errorf("Failed to send AMQP message: %s", err)
}
}
return nil
Expand Down

0 comments on commit d4693ee

Please sign in to comment.