Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Syslog input plugin #4181

Merged
merged 16 commits into from
May 25, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Update: Syslog address option must contain protocol now
  • Loading branch information
leodido committed May 24, 2018
commit 6ca6e91843e2e27bd66100c066683826a3891351
14 changes: 4 additions & 10 deletions plugins/inputs/syslog/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,11 @@ This plugin listens for syslog messages following RFC5424 format. When received

```toml
[[inputs.syslog]]
## Specify an ip or hostname with port - eg., localhost:6514, 10.0.0.1:6514
## Address and port to host the syslog receiver.
## If no server is specified, then localhost is used as the host.
## Specify an ip or hostname with port - eg., tcp://localhost:6514, tcp://10.0.0.1:6514
## Protocol, address and port to host the syslog receiver.
## If no host is specified, then localhost is used.
## If no port is specified, 6514 is used (RFC5425#section-4.1).
server = ":6514"

## Protocol (default = tcp)
## Should be one of the following values:
## tcp, tcp4, tcp6, unix, unixpacket, udp, udp4, udp6, ip, ip4, ip6, unixgram.
## Otherwise forced to the default.
# protocol = "tcp"
server = "tcp://:6514"

## TLS Config
# tls_allowed_cacerts = ["/etc/telegraf/ca.pem"]
Expand Down
7 changes: 1 addition & 6 deletions plugins/inputs/syslog/rfc5425_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,6 @@ import (
"github.com/stretchr/testify/require"
)

const (
address = ":6514"
)

var (
pki = testutil.NewPKI("../../../testutil/pki")
)
Expand Down Expand Up @@ -352,8 +348,7 @@ func newTCPSyslogReceiver(keepAlive *internal.Duration, maxConn int, bestEffort
Duration: defaultReadTimeout,
}
s := &Syslog{
Protocol: "tcp",
Address: address,
Address: "tcp://" + address,
now: func() time.Time {
return defaultTime
},
Expand Down
6 changes: 2 additions & 4 deletions plugins/inputs/syslog/rfc5426_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,7 @@ func getTestCasesForRFC5426() []testCase5426 {

func newUDPSyslogReceiver(bestEffort bool) *Syslog {
return &Syslog{
Protocol: "udp",
Address: address,
Address: "udp://" + address,
now: func() time.Time {
return defaultTime
},
Expand All @@ -220,7 +219,6 @@ func testRFC5426(t *testing.T, bestEffort bool) {
t.Run(tc.name, func(t *testing.T) {
// Create receiver
receiver := newUDPSyslogReceiver(bestEffort)
require.Equal(t, receiver.Protocol, "udp")
acc := &testutil.Accumulator{}
require.NoError(t, receiver.Start(acc))
defer receiver.Stop()
Expand All @@ -231,8 +229,8 @@ func testRFC5426(t *testing.T, bestEffort bool) {

// Connect
conn, err := net.Dial("udp", address)
defer conn.Close()
require.NotNil(t, conn)
defer conn.Close()
require.Nil(t, err)

// Write
Expand Down
74 changes: 51 additions & 23 deletions plugins/inputs/syslog/syslog.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"io"
"net"
"net/url"
"os"
"strings"
"sync"
Expand All @@ -23,9 +24,8 @@ const ipMaxPacketSize = 64 * 1024

// Syslog is a syslog plugin
type Syslog struct {
Address string `toml:"server"`
Protocol string
tlsConfig.ServerConfig
Address string `toml:"server"`
KeepAlivePeriod *internal.Duration
ReadTimeout *internal.Duration
MaxConnections int
Expand All @@ -38,7 +38,7 @@ type Syslog struct {
wg sync.WaitGroup
io.Closer

isTCP bool
isStream bool
tcpListener net.Listener
tlsConfig *tls.Config
connections map[string]net.Conn
Expand All @@ -48,17 +48,11 @@ type Syslog struct {
}

var sampleConfig = `
## Specify an ip or hostname with port - eg., localhost:6514, 10.0.0.1:6514
## Address and port to host the syslog receiver.
## If no server is specified, then localhost is used as the host.
## Specify an ip or hostname with port - eg., tcp://localhost:6514, tcp://10.0.0.1:6514
## Protocol, address and port to host the syslog receiver.
## If no host is specified, then localhost is used.
## If no port is specified, 6514 is used (RFC5425#section-4.1).
server = ":6514"

## Protocol (default = tcp)
## Should be one of the following values:
## tcp, tcp4, tcp6, unix, unixpacket, udp, udp4, udp6, ip, ip4, ip6, unixgram.
## Otherwise forced to the default.
# protocol = "tcp"
server = "tcp://:6514"

## TLS Config
# tls_allowed_cacerts = ["/etc/telegraf/ca.pem"]
Expand Down Expand Up @@ -112,22 +106,27 @@ func (s *Syslog) Start(acc telegraf.Accumulator) error {
s.mu.Lock()
defer s.mu.Unlock()

switch s.Protocol {
scheme, host, err := getAddressParts(s.Address)
if err != nil {
return err
}
s.Address = host

switch scheme {
case "tcp", "tcp4", "tcp6", "unix", "unixpacket":
s.isTCP = true
s.isStream = true
case "udp", "udp4", "udp6", "ip", "ip4", "ip6", "unixgram":
s.isTCP = false
s.isStream = false
default:
s.Protocol = "tcp"
s.isTCP = true
return fmt.Errorf("unknown protocol '%s' in '%s'", scheme, s.Address)
}

if s.Protocol == "unix" || s.Protocol == "unixpacket" || s.Protocol == "unixgram" {
if scheme == "unix" || scheme == "unixpacket" || scheme == "unixgram" {
os.Remove(s.Address)
}

if s.isTCP {
l, err := net.Listen(s.Protocol, s.Address)
if s.isStream {
l, err := net.Listen(scheme, s.Address)
if err != nil {
return err
}
Expand All @@ -141,7 +140,7 @@ func (s *Syslog) Start(acc telegraf.Accumulator) error {
s.wg.Add(1)
go s.listenStream(acc)
} else {
l, err := net.ListenPacket(s.Protocol, s.Address)
l, err := net.ListenPacket(scheme, s.Address)
if err != nil {
return err
}
Expand All @@ -152,7 +151,7 @@ func (s *Syslog) Start(acc telegraf.Accumulator) error {
go s.listenPacket(acc)
}

if s.Protocol == "unix" || s.Protocol == "unixpacket" || s.Protocol == "unixgram" {
if scheme == "unix" || scheme == "unixpacket" || scheme == "unixgram" {
s.Closer = unixCloser{path: s.Address, closer: s.Closer}
}

Expand All @@ -170,6 +169,35 @@ func (s *Syslog) Stop() {
s.wg.Wait()
}

// getAddressParts returns the address scheme and host
// it also sets defaults for them when missing
// when the input address does not specify the protocol it returns an error
func getAddressParts(a string) (string, string, error) {
parts := strings.SplitN(a, "://", 2)
if len(parts) != 2 {
return "", "", fmt.Errorf("missing protocol within address '%s'", a)
}

u, _ := url.Parse(a)
switch u.Scheme {
case "unix", "unixpacket", "unixgram":
return parts[0], parts[1], nil
}

var host string
if u.Hostname() != "" {
host = u.Hostname()
}
host += ":"
if u.Port() == "" {
host += "6514"
} else {
host += u.Port()
}

return u.Scheme, host, nil
}

func (s *Syslog) listenPacket(acc telegraf.Accumulator) {
defer s.wg.Done()
b := make([]byte, ipMaxPacketSize)
Expand Down
41 changes: 37 additions & 4 deletions plugins/inputs/syslog/syslog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ import (
"github.com/stretchr/testify/require"
)

const (
address = ":6514"
)

var defaultTime = time.Unix(0, 0)
var maxP = uint8(191)
var maxV = uint16(999)
Expand All @@ -19,9 +23,38 @@ var maxPID = "abcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabc
var maxMID = "abcdefghilmnopqrstuvzabcdefghilm"
var message7681 = strings.Repeat("l", 7681)

func TestListenError(t *testing.T) {
receiver := &Syslog{
Address: "wrong address",
func TestAddress(t *testing.T) {
var err error
var rec *Syslog

rec = &Syslog{
Address: "localhost:6514",
}
err = rec.Start(&testutil.Accumulator{})
require.EqualError(t, err, "missing protocol within address 'localhost:6514'")
require.Error(t, err)

rec = &Syslog{
Address: "unsupported://example.com:6514",
}
err = rec.Start(&testutil.Accumulator{})
require.EqualError(t, err, "unknown protocol 'unsupported' in 'example.com:6514'")
require.Error(t, err)

rec = &Syslog{
Address: "unixgram:///tmp/telegraf.sock",
}
err = rec.Start(&testutil.Accumulator{})
require.NoError(t, err)
require.Equal(t, "/tmp/telegraf.sock", rec.Address)
rec.Stop()

// Default port is 6514
rec = &Syslog{
Address: "tcp://localhost",
}
require.Error(t, receiver.Start(&testutil.Accumulator{}))
err = rec.Start(&testutil.Accumulator{})
require.NoError(t, err)
require.Equal(t, "localhost:6514", rec.Address)
rec.Stop()
}