Skip to content

Commit

Permalink
feat(nsq_consumer): Add input plugin to consume metrics from an nsqd …
Browse files Browse the repository at this point in the history
…topic
  • Loading branch information
Jonathan Chauncey committed Jun 13, 2016
1 parent 4d24283 commit 6162030
Show file tree
Hide file tree
Showing 6 changed files with 372 additions and 0 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
### Features

- [#1340](https://github.com/influxdata/telegraf/issues/1340): statsd: do not log every dropped metric.
- [#1369](https://github.com/influxdata/telegraf/pull/1369): Add input plugin for consuming metrics from NSQD.


### Bugfixes

Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ Telegraf can also collect metrics via the following service plugins:
* [mqtt_consumer](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/mqtt_consumer)
* [kafka_consumer](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/kafka_consumer)
* [nats_consumer](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/nats_consumer)
* [nsq_consumer](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/nsq_consumer)
* [github_webhooks](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/github_webhooks)
* [rollbar_webhooks](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/rollbar_webhooks)

Expand Down
1 change: 1 addition & 0 deletions plugins/inputs/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/net_response"
_ "github.com/influxdata/telegraf/plugins/inputs/nginx"
_ "github.com/influxdata/telegraf/plugins/inputs/nsq"
_ "github.com/influxdata/telegraf/plugins/inputs/nsq_consumer"
_ "github.com/influxdata/telegraf/plugins/inputs/nstat"
_ "github.com/influxdata/telegraf/plugins/inputs/ntpq"
_ "github.com/influxdata/telegraf/plugins/inputs/passenger"
Expand Down
26 changes: 26 additions & 0 deletions plugins/inputs/nsq_consumer/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# NSQ Consumer Input Plugin

The [NSQ](http://nsq.io/) consumer plugin polls a specified NSQD
topic and adds messages to InfluxDB. The plugin assumes messages follow the
line protocol.

## Configuration

```toml
# Read metrics from NSQD topic(s)
[[inputs.nsq_consumer]]
## An array of NSQD HTTP API endpoints
server = "localhost:4150"
topic = "telegraf"
channel = "consumer"
max_in_flight = 100

## Data format to consume.
## Each data format has it's own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"
```

## Testing
The `nsq_consumer_test` mocks out the interaction with `NSQD`. It requires no outside dependencies.
98 changes: 98 additions & 0 deletions plugins/inputs/nsq_consumer/nsq_consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package nsq_consumer

import (
"log"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/nsqio/go-nsq"
)

//NSQConsumer represents the configuration of the plugin
type NSQConsumer struct {
Server string
Topic string
Channel string
MaxInFlight int
parser parsers.Parser
consumer *nsq.Consumer
acc telegraf.Accumulator
}

var sampleConfig = `
## An array of NSQD HTTP API endpoints
server = "localhost:4150"
topic = "telegraf"
channel = "consumer"
max_in_flight = 100
## Data format to consume.
## Each data format has it's own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"
`

func init() {
inputs.Add("nsq_consumer", func() telegraf.Input {
return &NSQConsumer{}
})
}

// SetParser takes the data_format from the config and finds the right parser for that format
func (nsqConsumer *NSQConsumer) SetParser(parser parsers.Parser) {
nsqConsumer.parser = parser
}

// SampleConfig returns config values for generating a sample configuration file
func (nsqConsumer *NSQConsumer) SampleConfig() string {
return sampleConfig
}

// Description prints description string
func (nsqConsumer *NSQConsumer) Description() string {
return "Read NSQ topic for metrics."
}

// Start pulls data from nsq
func (nsqConsumer *NSQConsumer) Start(acc telegraf.Accumulator) error {
nsqConsumer.acc = acc
nsqConsumer.connect()
nsqConsumer.consumer.AddConcurrentHandlers(nsq.HandlerFunc(func(message *nsq.Message) error {
metrics, err := nsqConsumer.parser.Parse(message.Body)
if err != nil {
log.Printf("NSQConsumer Parse Error\nmessage:%s\nerror:%s", string(message.Body), err.Error())
}
for _, metric := range metrics {
nsqConsumer.acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time())
}
message.Finish()
return nil
}), nsqConsumer.MaxInFlight)
nsqConsumer.consumer.ConnectToNSQD(nsqConsumer.Server)
return nil
}

// Stop processing messages
func (nsqConsumer *NSQConsumer) Stop() {
nsqConsumer.consumer.Stop()
}

// Gather is a noop
func (nsqConsumer *NSQConsumer) Gather(acc telegraf.Accumulator) error {
return nil
}

func (nsqConsumer *NSQConsumer) connect() error {
if nsqConsumer.consumer == nil {
config := nsq.NewConfig()
config.MaxInFlight = nsqConsumer.MaxInFlight
consumer, err := nsq.NewConsumer(nsqConsumer.Topic, nsqConsumer.Channel, config)
if err != nil {
return err
}
nsqConsumer.consumer = consumer
}
return nil
}
244 changes: 244 additions & 0 deletions plugins/inputs/nsq_consumer/nsq_consumer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@
package nsq_consumer

import (
"bufio"
"bytes"
"encoding/binary"
"io"
"log"
"net"
"strconv"
"testing"
"time"

"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/testutil"
"github.com/nsqio/go-nsq"
"github.com/stretchr/testify/assert"
)

// This test is modeled after the kafka consumer integration test
func TestReadsMetricsFromNSQ(t *testing.T) {
msgID := nsq.MessageID{'1', '2', '3', '4', '5', '6', '7', '8', '9', '0', 'a', 's', 'd', 'f', 'g', 'h'}
msg := nsq.NewMessage(msgID, []byte("cpu_load_short,direction=in,host=server01,region=us-west value=23422.0 1422568543702900257"))

script := []instruction{
// SUB
instruction{0, nsq.FrameTypeResponse, []byte("OK")},
// IDENTIFY
instruction{0, nsq.FrameTypeResponse, []byte("OK")},
instruction{20 * time.Millisecond, nsq.FrameTypeMessage, frameMessage(msg)},
// needed to exit test
instruction{100 * time.Millisecond, -1, []byte("exit")},
}

addr, _ := net.ResolveTCPAddr("tcp", "127.0.0.1:4150")
newMockNSQD(script, addr.String())

consumer := &NSQConsumer{
Server: "127.0.0.1:4150",
Topic: "telegraf",
Channel: "consume",
MaxInFlight: 1,
}

p, _ := parsers.NewInfluxParser()
consumer.SetParser(p)
var acc testutil.Accumulator
assert.Equal(t, 0, len(acc.Metrics), "There should not be any points")
if err := consumer.Start(&acc); err != nil {
t.Fatal(err.Error())
} else {
defer consumer.Stop()
}

waitForPoint(&acc, t)

if len(acc.Metrics) == 1 {
point := acc.Metrics[0]
assert.Equal(t, "cpu_load_short", point.Measurement)
assert.Equal(t, map[string]interface{}{"value": 23422.0}, point.Fields)
assert.Equal(t, map[string]string{
"host": "server01",
"direction": "in",
"region": "us-west",
}, point.Tags)
assert.Equal(t, time.Unix(0, 1422568543702900257).Unix(), point.Time.Unix())
} else {
t.Errorf("No points found in accumulator, expected 1")
}

}

// Waits for the metric that was sent to the kafka broker to arrive at the kafka
// consumer
func waitForPoint(acc *testutil.Accumulator, t *testing.T) {
// Give the kafka container up to 2 seconds to get the point to the consumer
ticker := time.NewTicker(5 * time.Millisecond)
counter := 0
for {
select {
case <-ticker.C:
counter++
if counter > 1000 {
t.Fatal("Waited for 5s, point never arrived to consumer")
} else if acc.NFields() == 1 {
return
}
}
}
}

func newMockNSQD(script []instruction, addr string) *mockNSQD {
n := &mockNSQD{
script: script,
exitChan: make(chan int),
}

tcpListener, err := net.Listen("tcp", addr)
if err != nil {
log.Fatalf("FATAL: listen (%s) failed - %s", n.tcpAddr.String(), err)
}
n.tcpListener = tcpListener
n.tcpAddr = tcpListener.Addr().(*net.TCPAddr)

go n.listen()

return n
}

// The code below allows us to mock the interactions with nsqd. This is taken from:
// https://github.com/nsqio/go-nsq/blob/master/mock_test.go
type instruction struct {
delay time.Duration
frameType int32
body []byte
}

type mockNSQD struct {
script []instruction
got [][]byte
tcpAddr *net.TCPAddr
tcpListener net.Listener
exitChan chan int
}

func (n *mockNSQD) listen() {
for {
conn, err := n.tcpListener.Accept()
if err != nil {
break
}
go n.handle(conn)
}
close(n.exitChan)
}

func (n *mockNSQD) handle(conn net.Conn) {
var idx int
buf := make([]byte, 4)
_, err := io.ReadFull(conn, buf)
if err != nil {
log.Fatalf("ERROR: failed to read protocol version - %s", err)
}

readChan := make(chan []byte)
readDoneChan := make(chan int)
scriptTime := time.After(n.script[0].delay)
rdr := bufio.NewReader(conn)

go func() {
for {
line, err := rdr.ReadBytes('\n')
if err != nil {
return
}
// trim the '\n'
line = line[:len(line)-1]
readChan <- line
<-readDoneChan
}
}()

var rdyCount int
for idx < len(n.script) {
select {
case line := <-readChan:
n.got = append(n.got, line)
params := bytes.Split(line, []byte(" "))
switch {
case bytes.Equal(params[0], []byte("IDENTIFY")):
l := make([]byte, 4)
_, err := io.ReadFull(rdr, l)
if err != nil {
log.Printf(err.Error())
goto exit
}
size := int32(binary.BigEndian.Uint32(l))
b := make([]byte, size)
_, err = io.ReadFull(rdr, b)
if err != nil {
log.Printf(err.Error())
goto exit
}
case bytes.Equal(params[0], []byte("RDY")):
rdy, _ := strconv.Atoi(string(params[1]))
rdyCount = rdy
case bytes.Equal(params[0], []byte("FIN")):
case bytes.Equal(params[0], []byte("REQ")):
}
readDoneChan <- 1
case <-scriptTime:
inst := n.script[idx]
if bytes.Equal(inst.body, []byte("exit")) {
goto exit
}
if inst.frameType == nsq.FrameTypeMessage {
if rdyCount == 0 {
scriptTime = time.After(n.script[idx+1].delay)
continue
}
rdyCount--
}
_, err := conn.Write(framedResponse(inst.frameType, inst.body))
if err != nil {
log.Printf(err.Error())
goto exit
}
scriptTime = time.After(n.script[idx+1].delay)
idx++
}
}

exit:
n.tcpListener.Close()
conn.Close()
}

func framedResponse(frameType int32, data []byte) []byte {
var w bytes.Buffer

beBuf := make([]byte, 4)
size := uint32(len(data)) + 4

binary.BigEndian.PutUint32(beBuf, size)
_, err := w.Write(beBuf)
if err != nil {
return nil
}

binary.BigEndian.PutUint32(beBuf, uint32(frameType))
_, err = w.Write(beBuf)
if err != nil {
return nil
}

w.Write(data)
return w.Bytes()
}

func frameMessage(m *nsq.Message) []byte {
var b bytes.Buffer
m.WriteTo(&b)
return b.Bytes()
}

0 comments on commit 6162030

Please sign in to comment.