Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Plugin: twemproxy #365

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Add plugin for Twemproxy
This plugin collects data from Twemproxy's stats interface
  • Loading branch information
codeb2cc committed Nov 13, 2015
commit 49c1bf3ef6fa0d20c20b0a521cef816e6cc0b673
1 change: 1 addition & 0 deletions plugins/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
_ "github.com/influxdb/telegraf/plugins/rethinkdb"
_ "github.com/influxdb/telegraf/plugins/statsd"
_ "github.com/influxdb/telegraf/plugins/system"
_ "github.com/influxdb/telegraf/plugins/twemproxy"
_ "github.com/influxdb/telegraf/plugins/zfs"
_ "github.com/influxdb/telegraf/plugins/zookeeper"
)
182 changes: 182 additions & 0 deletions plugins/twemproxy/twemproxy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
package twemproxy

import (
"encoding/json"
"errors"
"io/ioutil"
"net"
"strings"
"sync"
"time"

"github.com/influxdb/telegraf/plugins"
)

type Twemproxy struct {
Instances []TwemproxyInstance
}

type TwemproxyInstance struct {
StatsAddr string
Pools []string
}

var sampleConfig = `
# Twemproxy plugin config
[twemproxy]
[[twemproxy.instances]]
# Twemproxy stats address and port(NO scheme!)
statsAddr = "10.16.29.1:22222"
# Monitor pool name
pools = ["redis_pool", "mc_pool"]
`

func (t *Twemproxy) SampleConfig() string {
return sampleConfig
}

func (t *Twemproxy) Description() string {
return "Read Twemproxy stats data"
}

// Gather data from all Twemproxy instances
func (t *Twemproxy) Gather(acc plugins.Accumulator) error {
var wg sync.WaitGroup
errorChan := make(chan error, len(t.Instances))
for _, inst := range t.Instances {
wg.Add(1)
go func(inst TwemproxyInstance) {
defer wg.Done()
if err := inst.Gather(acc); err != nil {
errorChan <- err
}
}(inst)
}
wg.Wait()

close(errorChan)
errs := []string{}
for err := range errorChan {
errs = append(errs, err.Error())
}
if len(errs) == 0 {
return nil
}
return errors.New(strings.Join(errs, "\n"))
}

// Gather data from one Twemproxy
func (ti *TwemproxyInstance) Gather(
acc plugins.Accumulator,
) error {
conn, err := net.DialTimeout("tcp", ti.StatsAddr, 1*time.Second)
if err != nil {
return err
}
body, err := ioutil.ReadAll(conn)
if err != nil {
return err
}

var stats map[string]interface{}
if err = json.Unmarshal(body, &stats); err != nil {
return errors.New("Error decoding JSON response")
}

tags := make(map[string]string)
tags["twemproxy"] = ti.StatsAddr
ti.processStat(acc, tags, stats)

return nil
}

// Process Twemproxy server stats
func (ti *TwemproxyInstance) processStat(
acc plugins.Accumulator,
tags map[string]string,
data map[string]interface{},
) {
if source, ok := data["source"]; ok {
if val, ok := source.(string); ok {
tags["source"] = val
}
}

metrics := []string{"total_connections", "curr_connections", "timestamp"}
for _, m := range metrics {
if value, ok := data[m]; ok {
if val, ok := value.(float64); ok {
acc.Add(m, val, tags)
}
}
}

for _, pool := range ti.Pools {
if poolStat, ok := data[pool]; ok {
if data, ok := poolStat.(map[string]interface{}); ok {
poolTags := copyTags(tags)
poolTags["pool"] = pool
ti.processPool(acc, poolTags, pool+"_", data)
}
}
}
}

// Process pool data in Twemproxy stats
func (ti *TwemproxyInstance) processPool(
acc plugins.Accumulator,
tags map[string]string,
prefix string,
data map[string]interface{},
) {
serverTags := make(map[string]map[string]string)

for key, value := range data {
switch key {
case "client_connections", "forward_error", "client_err", "server_ejects", "fragments", "client_eof":
if val, ok := value.(float64); ok {
acc.Add(prefix+key, val, tags)
}
default:
if data, ok := value.(map[string]interface{}); ok {
if _, ok := serverTags[key]; !ok {
serverTags[key] = copyTags(tags)
serverTags[key]["server"] = key
}
ti.processServer(acc, serverTags[key], prefix, data)
}
}
}
}

// Process backend server(redis/memcached) stats
func (ti *TwemproxyInstance) processServer(
acc plugins.Accumulator,
tags map[string]string,
prefix string,
data map[string]interface{},
) {
for key, value := range data {
switch key {
default:
if val, ok := value.(float64); ok {
acc.Add(prefix+key, val, tags)
}
}
}
}

// Tags is not expected to be mutated after passing to Add.
func copyTags(tags map[string]string) map[string]string {
newTags := make(map[string]string)
for k, v := range tags {
newTags[k] = v
}
return newTags
}

func init() {
plugins.Add("twemproxy", func() plugins.Plugin {
return &Twemproxy{}
})
}
134 changes: 134 additions & 0 deletions plugins/twemproxy/twemproxy_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package twemproxy

import (
"net"
"testing"
"encoding/json"

"github.com/influxdb/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

const sampleStatsAddr = "127.0.0.1:22222"

const sampleStats = `{
"total_connections": 276448,
"uptime": 160657,
"version": "0.4.1",
"service": "nutcracker",
"curr_connections": 1322,
"source": "server1.website.com",
"demo": {
"client_connections": 1305,
"forward_error": 11684,
"client_err": 147942,
"server_ejects": 0,
"fragments": 0,
"client_eof": 126813,
"10.16.29.1:6379": {
"requests": 43604566,
"server_eof": 0,
"out_queue": 0,
"server_err": 0,
"out_queue_bytes": 0,
"in_queue": 0,
"server_timedout": 24,
"request_bytes": 2775840400,
"server_connections": 1,
"response_bytes": 7663182096,
"in_queue_bytes": 0,
"server_ejected_at": 0,
"responses": 43603900
},
"10.16.29.2:6379": {
"requests": 37870211,
"server_eof": 0,
"out_queue": 0,
"server_err": 0,
"out_queue_bytes": 0,
"in_queue": 0,
"server_timedout": 25,
"request_bytes": 2412114759,
"server_connections": 1,
"response_bytes": 5228980582,
"in_queue_bytes": 0,
"server_ejected_at": 0,
"responses": 37869551
}
},
"timestamp": 1447312436
}`

func mockTwemproxyServer() (net.Listener, error) {
listener, err := net.Listen("tcp", sampleStatsAddr)
if err != nil {
return nil, err
}
go func(l net.Listener) {
for {
conn, _ := l.Accept()
conn.Write([]byte(sampleStats))
conn.Close()
break
}
}(listener)

return listener, nil
}

func TestGather(t *testing.T) {
mockServer, err := mockTwemproxyServer()
if err != nil {
panic(err)
}
defer mockServer.Close()

twemproxy := &Twemproxy{
Instances: []TwemproxyInstance{
TwemproxyInstance{
StatsAddr: sampleStatsAddr,
Pools: []string{"demo"},
},
},
}

var acc testutil.Accumulator
err = twemproxy.Instances[0].Gather(&acc)
require.NoError(t, err)

var sourceData map[string]interface{}
if err := json.Unmarshal([]byte(sampleStats), &sourceData); err != nil {
panic(err)
}

metrics := []string{"total_connections", "curr_connections", "timestamp"}
tags := map[string]string{
"twemproxy": sampleStatsAddr,
"source": sourceData["source"].(string),
}
for _, m := range metrics {
assert.NoError(t, acc.ValidateTaggedValue(m, sourceData[m].(float64), tags))
}

poolName := "demo"
poolMetrics := []string{
"client_connections", "forward_error", "client_err", "server_ejects",
"fragments", "client_eof",
}
tags["pool"] = poolName
poolData := sourceData[poolName].(map[string]interface{})
for _, m := range poolMetrics {
measurement := poolName + "_" + m
assert.NoError(t, acc.ValidateTaggedValue(measurement, poolData[m].(float64), tags))
}
poolServers := []string{"10.16.29.1:6379", "10.16.29.2:6379"}
for _, s := range poolServers {
tags["server"] = s
serverData := poolData[s].(map[string]interface{})
for k, v := range serverData {
measurement := poolName + "_" + k
assert.NoError(t, acc.ValidateTaggedValue(measurement, v, tags))
}
}
}