Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(nsq_consumer): Add input plugin to consume metrics from an nsqd topic #1369

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
feat(nsq_consumer): Add input plugin to consume metrics from an nsqd …
…topic
  • Loading branch information
Jonathan Chauncey committed Jun 22, 2016
commit 19149d3e132a5dc334d75cfd71c9436ccf50e88b
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
- [#1390](https://github.com/influxdata/telegraf/pull/1390): Add support for Tengine
- [#1320](https://github.com/influxdata/telegraf/pull/1320): Logparser input plugin for parsing grok-style log patterns.
- [#1397](https://github.com/influxdata/telegraf/issues/1397): ElasticSearch: now supports connecting to ElasticSearch via SSL
- [#1369](https://github.com/influxdata/telegraf/pull/1369): Add input plugin for consuming metrics from NSQD.


### Bugfixes

Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ Telegraf can also collect metrics via the following service plugins:
* [mqtt_consumer](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/mqtt_consumer)
* [kafka_consumer](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/kafka_consumer)
* [nats_consumer](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/nats_consumer)
* [nsq_consumer](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/nsq_consumer)
* [github_webhooks](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/github_webhooks)
* [rollbar_webhooks](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/rollbar_webhooks)

Expand Down
1 change: 1 addition & 0 deletions plugins/inputs/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/net_response"
_ "github.com/influxdata/telegraf/plugins/inputs/nginx"
_ "github.com/influxdata/telegraf/plugins/inputs/nsq"
_ "github.com/influxdata/telegraf/plugins/inputs/nsq_consumer"
_ "github.com/influxdata/telegraf/plugins/inputs/nstat"
_ "github.com/influxdata/telegraf/plugins/inputs/ntpq"
_ "github.com/influxdata/telegraf/plugins/inputs/passenger"
Expand Down
25 changes: 25 additions & 0 deletions plugins/inputs/nsq_consumer/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# NSQ Consumer Input Plugin

The [NSQ](http://nsq.io/) consumer plugin polls a specified NSQD
topic and adds messages to InfluxDB. This plugin allows a message to be in any of the supported `data_format` types.

## Configuration

```toml
# Read metrics from NSQD topic(s)
[[inputs.nsq_consumer]]
## An array of NSQD HTTP API endpoints
server = "localhost:4150"
topic = "telegraf"
channel = "consumer"
max_in_flight = 100

## Data format to consume.
## Each data format has it's own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"
```

## Testing
The `nsq_consumer_test` mocks out the interaction with `NSQD`. It requires no outside dependencies.
99 changes: 99 additions & 0 deletions plugins/inputs/nsq_consumer/nsq_consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package nsq_consumer

import (
"log"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/nsqio/go-nsq"
)

//NSQConsumer represents the configuration of the plugin
type NSQConsumer struct {
Server string
Topic string
Channel string
MaxInFlight int
parser parsers.Parser
consumer *nsq.Consumer
acc telegraf.Accumulator
}

var sampleConfig = `
## An string representing the NSQD TCP Endpoint
server = "localhost:4150"
topic = "telegraf"
channel = "consumer"
max_in_flight = 100

## Data format to consume.
## Each data format has it's own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"
`

func init() {
inputs.Add("nsq_consumer", func() telegraf.Input {
return &NSQConsumer{}
})
}

// SetParser takes the data_format from the config and finds the right parser for that format
func (n *NSQConsumer) SetParser(parser parsers.Parser) {
n.parser = parser
}

// SampleConfig returns config values for generating a sample configuration file
func (n *NSQConsumer) SampleConfig() string {
return sampleConfig
}

// Description prints description string
func (n *NSQConsumer) Description() string {
return "Read NSQ topic for metrics."
}

// Start pulls data from nsq
func (n *NSQConsumer) Start(acc telegraf.Accumulator) error {
n.acc = acc
n.connect()
n.consumer.AddConcurrentHandlers(nsq.HandlerFunc(func(message *nsq.Message) error {
metrics, err := n.parser.Parse(message.Body)
if err != nil {
log.Printf("NSQConsumer Parse Error\nmessage:%s\nerror:%s", string(message.Body), err.Error())
return nil
}
for _, metric := range metrics {
n.acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time())
}
message.Finish()
return nil
}), n.MaxInFlight)
n.consumer.ConnectToNSQD(n.Server)
return nil
}

// Stop processing messages
func (n *NSQConsumer) Stop() {
n.consumer.Stop()
}

// Gather is a noop
func (n *NSQConsumer) Gather(acc telegraf.Accumulator) error {
return nil
}

func (n *NSQConsumer) connect() error {
if n.consumer == nil {
config := nsq.NewConfig()
config.MaxInFlight = n.MaxInFlight
consumer, err := nsq.NewConsumer(n.Topic, n.Channel, config)
if err != nil {
return err
}
n.consumer = consumer
}
return nil
}
245 changes: 245 additions & 0 deletions plugins/inputs/nsq_consumer/nsq_consumer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
package nsq_consumer

import (
"bufio"
"bytes"
"encoding/binary"
"io"
"log"
"net"
"strconv"
"testing"
"time"

"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/testutil"
"github.com/nsqio/go-nsq"
"github.com/stretchr/testify/assert"
)

// This test is modeled after the kafka consumer integration test
func TestReadsMetricsFromNSQ(t *testing.T) {
msgID := nsq.MessageID{'1', '2', '3', '4', '5', '6', '7', '8', '9', '0', 'a', 's', 'd', 'f', 'g', 'h'}
msg := nsq.NewMessage(msgID, []byte("cpu_load_short,direction=in,host=server01,region=us-west value=23422.0 1422568543702900257"))

script := []instruction{
// SUB
instruction{0, nsq.FrameTypeResponse, []byte("OK")},
// IDENTIFY
instruction{0, nsq.FrameTypeResponse, []byte("OK")},
instruction{20 * time.Millisecond, nsq.FrameTypeMessage, frameMessage(msg)},
// needed to exit test
instruction{100 * time.Millisecond, -1, []byte("exit")},
}

addr, _ := net.ResolveTCPAddr("tcp", "127.0.0.1:4155")
newMockNSQD(script, addr.String())

consumer := &NSQConsumer{
Server: "127.0.0.1:4155",
Topic: "telegraf",
Channel: "consume",
MaxInFlight: 1,
}

p, _ := parsers.NewInfluxParser()
consumer.SetParser(p)
var acc testutil.Accumulator
assert.Equal(t, 0, len(acc.Metrics), "There should not be any points")
if err := consumer.Start(&acc); err != nil {
t.Fatal(err.Error())
} else {
defer consumer.Stop()
}

waitForPoint(&acc, t)

if len(acc.Metrics) == 1 {
point := acc.Metrics[0]
assert.Equal(t, "cpu_load_short", point.Measurement)
assert.Equal(t, map[string]interface{}{"value": 23422.0}, point.Fields)
assert.Equal(t, map[string]string{
"host": "server01",
"direction": "in",
"region": "us-west",
}, point.Tags)
assert.Equal(t, time.Unix(0, 1422568543702900257).Unix(), point.Time.Unix())
} else {
t.Errorf("No points found in accumulator, expected 1")
}

}

// Waits for the metric that was sent to the kafka broker to arrive at the kafka
// consumer
func waitForPoint(acc *testutil.Accumulator, t *testing.T) {
// Give the kafka container up to 2 seconds to get the point to the consumer
ticker := time.NewTicker(5 * time.Millisecond)
defer ticker.Stop()
counter := 0
for {
select {
case <-ticker.C:
counter++
if counter > 1000 {
t.Fatal("Waited for 5s, point never arrived to consumer")
} else if acc.NFields() == 1 {
return
}
}
}
}

func newMockNSQD(script []instruction, addr string) *mockNSQD {
n := &mockNSQD{
script: script,
exitChan: make(chan int),
}

tcpListener, err := net.Listen("tcp", addr)
if err != nil {
log.Fatalf("FATAL: listen (%s) failed - %s", n.tcpAddr.String(), err)
}
n.tcpListener = tcpListener
n.tcpAddr = tcpListener.Addr().(*net.TCPAddr)

go n.listen()

return n
}

// The code below allows us to mock the interactions with nsqd. This is taken from:
// https://github.com/nsqio/go-nsq/blob/master/mock_test.go
type instruction struct {
delay time.Duration
frameType int32
body []byte
}

type mockNSQD struct {
script []instruction
got [][]byte
tcpAddr *net.TCPAddr
tcpListener net.Listener
exitChan chan int
}

func (n *mockNSQD) listen() {
for {
conn, err := n.tcpListener.Accept()
if err != nil {
break
}
go n.handle(conn)
}
close(n.exitChan)
}

func (n *mockNSQD) handle(conn net.Conn) {
var idx int
buf := make([]byte, 4)
_, err := io.ReadFull(conn, buf)
if err != nil {
log.Fatalf("ERROR: failed to read protocol version - %s", err)
}

readChan := make(chan []byte)
readDoneChan := make(chan int)
scriptTime := time.After(n.script[0].delay)
rdr := bufio.NewReader(conn)

go func() {
for {
line, err := rdr.ReadBytes('\n')
if err != nil {
return
}
// trim the '\n'
line = line[:len(line)-1]
readChan <- line
<-readDoneChan
}
}()

var rdyCount int
for idx < len(n.script) {
select {
case line := <-readChan:
n.got = append(n.got, line)
params := bytes.Split(line, []byte(" "))
switch {
case bytes.Equal(params[0], []byte("IDENTIFY")):
l := make([]byte, 4)
_, err := io.ReadFull(rdr, l)
if err != nil {
log.Printf(err.Error())
goto exit
}
size := int32(binary.BigEndian.Uint32(l))
b := make([]byte, size)
_, err = io.ReadFull(rdr, b)
if err != nil {
log.Printf(err.Error())
goto exit
}
case bytes.Equal(params[0], []byte("RDY")):
rdy, _ := strconv.Atoi(string(params[1]))
rdyCount = rdy
case bytes.Equal(params[0], []byte("FIN")):
case bytes.Equal(params[0], []byte("REQ")):
}
readDoneChan <- 1
case <-scriptTime:
inst := n.script[idx]
if bytes.Equal(inst.body, []byte("exit")) {
goto exit
}
if inst.frameType == nsq.FrameTypeMessage {
if rdyCount == 0 {
scriptTime = time.After(n.script[idx+1].delay)
continue
}
rdyCount--
}
_, err := conn.Write(framedResponse(inst.frameType, inst.body))
if err != nil {
log.Printf(err.Error())
goto exit
}
scriptTime = time.After(n.script[idx+1].delay)
idx++
}
}

exit:
n.tcpListener.Close()
conn.Close()
}

func framedResponse(frameType int32, data []byte) []byte {
var w bytes.Buffer

beBuf := make([]byte, 4)
size := uint32(len(data)) + 4

binary.BigEndian.PutUint32(beBuf, size)
_, err := w.Write(beBuf)
if err != nil {
return nil
}

binary.BigEndian.PutUint32(beBuf, uint32(frameType))
_, err = w.Write(beBuf)
if err != nil {
return nil
}

w.Write(data)
return w.Bytes()
}

func frameMessage(m *nsq.Message) []byte {
var b bytes.Buffer
m.WriteTo(&b)
return b.Bytes()
}